我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Redis源码分析之set 和 sorted set 使用

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Redis源码分析之set 和 sorted set 使用

set 和 sorted set

前言

前面在几个文章聊到了 list,string,hash 等结构的实现,这次来聊一下 set 和 sorted set 的细节。

set

Redis 的 Set 是 String 类型的无序集合,集合成员是唯一的。

底层实现主要用到了两种数据结构 hashtable 和 inset(整数集合)。

集合中最大的成员数为2的32次方-1 (4294967295, 每个集合可存储40多亿个成员)。

常见命令

来看下几个常用的命令

# 向集合添加一个或多个成员
SADD key member1 [member2]

# 获取集合的成员数
SCARD key
# 返回第一个集合与其他集合之间的差异。
SDIFF key1 [key2]
# 返回给定所有集合的差集并存储在 destination 中
SDIFFSTORE destination key1 [key2]
# 返回给定所有集合的交集
SINTER key1 [key2]
# 返回给定所有集合的交集并存储在 destination 中
SINTERSTORE destination key1 [key2]
# 判断 member 元素是否是集合 key 的成员
SISMEMBER key member
# 返回集合中的所有成员
SMEMBERS key
# 将 member 元素从 source 集合移动到 destination 集合
SMOVE source destination member
# 移除并返回集合中的一个随机元素
SPOP key
# 返回集合中一个或多个随机数
SRANDMEMBER key [count]
# 移除集合中一个或多个成员
SREM key member1 [member2]
# 返回所有给定集合的并集
SUNION key1 [key2]
# 所有给定集合的并集存储在 destination 集合中
SUNIONSTORE destination key1 [key2]
# 迭代集合中的元素
SSCAN key cursor [MATCH pattern] [COUNT count]

来个栗子

127.0.0.1:6379>  SADD set-test xiaoming
(integer) 1
(integer) 0
127.0.0.1:6379>  SADD set-test xiaoming1
127.0.0.1:6379>  SADD set-test xiaoming2

127.0.0.1:6379> SMEMBERS set-test
1) "xiaoming2"
2) "xiaoming"
3) "xiaoming1"

上面重复值的插入,只有第一次可以插入成功

set 的使用场景

比较适用于聚合分类

1、标签:比如我们博客网站常常使用到的兴趣标签,把一个个有着相同爱好,关注类似内容的用户利用一个标签把他们进行归并。

2、共同好友功能,共同喜好,或者可以引申到二度好友之类的扩展应用。

3、统计网站的独立IP。利用set集合当中元素不唯一性,可以快速实时统计访问网站的独立IP。

不过对于 set 中的命令要合理的应用,不然很容易造成慢查询

1、使用高效的命令,比如说,如果你需要返回一个 SET 中的所有成员时,不要使用 SMEMBERS 命令,而是要使用 SSCAN 多次迭代返回,避免一次返回大量数据,造成线程阻塞。

2、当你需要执行排序、交集、并集操作时,可以在客户端完成,而不要用SORT、SUNION、SINTER这些命令,以免拖慢 Redis 实例。

看下源码实现

这里来看下 set 中主要用到的数据类型

代码路径https://github.com/redis/redis/blob/6.2/class="lazy" data-src/t_set.c

void saddCommand(client *c) {
    robj *set;
    int j, added = 0;

    set = lookupKeyWrite(c->db,c->argv[1]);
    if (checkType(c,set,OBJ_SET)) return;
    
    if (set == NULL) {
        set = setTypeCreate(c->argv[2]->ptr);
        dbAdd(c->db,c->argv[1],set);
    }
    for (j = 2; j < c->argc; j++) {
        if (setTypeAdd(set,c->argv[j]->ptr)) added++;
    if (added) {
        signalModifiedKey(c,c->db,c->argv[1]);
        notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id);
    server.dirty += added;
    addReplyLongLong(c,added);
}
// 当 value 是整数类型使用 intset
// 否则使用哈希表
robj *setTypeCreate(sds value) {
    if (isSdsRepresentableAsLongLong(value,NULL) == C_OK)
        return createIntsetObject();
    return createSetObject();

int setTypeAdd(robj *subject, sds value) {
    long long llval;
    // 如果是 OBJ_ENCODING_HT 说明是哈希类型
    if (subject->encoding == OBJ_ENCODING_HT) {
        dict *ht = subject->ptr;
        dictEntry *de = dictAddRaw(ht,value,NULL);
        if (de) {
            // 设置key ,value 设置成null
            dictSetKey(ht,de,sdsdup(value));
            dictSetVal(ht,de,NULL);
            return 1;
        }
    // OBJ_ENCODING_INTSET 代表是 inset
    } else if (subject->encoding == OBJ_ENCODING_INTSET) {
        if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
            uint8_t success = 0;
            subject->ptr = intsetAdd(subject->ptr,llval,&success);
            if (success) {
                
                // 如果条目过多将会转换成集合
                size_t max_entries = server.set_max_intset_entries;
                
                if (max_entries >= 1<<30) max_entries = 1<<30;
                if (intsetLen(subject->ptr) > max_entries)
                    setTypeConvert(subject,OBJ_ENCODING_HT);
                return 1;
            }
        } else {
            
            setTypeConvert(subject,OBJ_ENCODING_HT);
            
            serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK);
    } else {
        serverPanic("Unknown set encoding");
    return 0;

通过上面的源码分析,可以看到

1、set 中主要用到了 hashtable 和 inset;

2、如果存储的类型是整数类型就会使用 inset,否则使用 hashtable;

3、使用 inset 有一个最大的限制,达到了最大的限制,也是会使用 hashtable;

再来看下 inset 数据结构

代码地址https://github.com/redis/redis/blob/6.2/class="lazy" data-src/intset.h

typedef struct intset {
    // 编码方法,指定当前存储的是 16 位,32 位,还是 64 位的整数
    uint32_t encoding;
    uint32_t length;
    // 实际保存元素的数组
    int8_t contents[];
} intset;
insert

来看下 intset 的数据插入


intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
    // 计算value的编码长度
    uint8_t valenc = _intsetValueEncoding(value);
    uint32_t pos;
    if (success) *success = 1;

    
    // 如果value的编码长度大于当前的编码位数,进行升级
    if (valenc > intrev32ifbe(is->encoding)) {
        
        return intsetUpgradeAndAdd(is,value);
    } else {
        
        // 当前值不存在的时候才进行插入  
        if (intsetSearch(is,value,&pos)) {
            if (success) *success = 0;
            return is;
        }
        is = intsetResize(is,intrev32ifbe(is->length)+1);
        if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);
    }
    // 数据插入
    _intsetSet(is,pos,value);
    is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
    return is;
}

intset 的数据插入有一个数据升级的过程,当一个整数被添加到整数集合时,首先需要判断下 新元素的类型和集合中现有元素类型的长短,如果新元素是一个32位的数字,现有集合类型是16位的,那么就需要将整数集合进行升级,然后才能将新的元素加入进来。

这样做的优点:

1、提升整数集合的灵活性,可以随意的添加整数,而不用关心整数的类型;

2、可以尽可能的节约内存。

了解完数据的插入再来看下 intset 中是如何来快速的搜索里面的数据


// 如果找到了对应的数据,返回 1 将 pos 设置为对应的位置
// 如果找不到,返回0,设置 pos 为可以为数据可以插入的位置
// intset 中的数据是排好序的,所以使用二分查找来寻找对应的元素  
static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
    int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;
    int64_t cur = -1;

    
    if (intrev32ifbe(is->length) == 0) {
        if (pos) *pos = 0;
        return 0;
    } else {
        
        if (value > _intsetGet(is,max)) {
            if (pos) *pos = intrev32ifbe(is->length);
            return 0;
        } else if (value < _intsetGet(is,0)) {
            if (pos) *pos = 0;
        }
    }
    // 使用二分查找
    while(max >= min) {
        mid = ((unsigned int)min + (unsigned int)max) >> 1;
        cur = _intsetGet(is,mid);
        if (value > cur) {
            min = mid+1;
        } else if (value < cur) {
            max = mid-1;
        } else {
            break;
    if (value == cur) {
        if (pos) *pos = mid;
        return 1;
        if (pos) *pos = min;
}

可以看到这里用到的是二分查找,intset 中的数据本身也就是排好序的

dict

来看下 dict 的数据结构

typedef struct dict {
    dictType *type;
    void *privdata;
    // 哈希表数组,长度为2,一个正常存储数据,一个用来扩容
    dictht ht[2];
    long rehashidx; 
    int16_t pauserehash; 
} dict;

// 哈希表结构,通过两个哈希表使用,实现增量的 rehash  
typedef struct dictht {
    dictEntry **table;
    // hash 容量大小
    unsigned long size;
    // 总是等于 size - 1,用于计算索引值
    unsigned long sizemask;
    // 实际存储的 dictEntry 数量
    unsigned long used;
} dictht;
//  k/v 键值对节点,是实际存储数据的节点  
typedef struct dictEntry {
    // 键对象,总是一个字符串类型的对象
    void *key;
    union {
        // void指针,这意味着它可以指向任何类型
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;
    // 指向下一个节点
    struct dictEntry *next;
} dictEntry;

可以看到 dict 中,是预留了两个哈希表,来处理渐进式的 rehash

rehash 细节参考  redis 中的字典

再来看下 dict 数据的插入

dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
    long index;
    dictEntry *entry;
    dictht *ht;

    if (dictIsRehashing(d)) _dictRehashStep(d);
    
    if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
        return NULL;
    
    // 这里来判断是否正在 Rehash 中
    ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
    entry = zmalloc(sizeof(*entry));
    entry->next = ht->table[index];
    ht->table[index] = entry;
    ht->used++;
    
    // 插入具体的数据  
    dictSetKey(d, entry, key);
    return entry;
}

这里重点来分析下 Rehash 的过程

int dictRehash(dict *d, int n) {
    int empty_visits = n*10; 
    // 这里来判断是否正在 Rehash 中
    if (!dictIsRehashing(d)) return 0;

    // used 实际存储的 dictEntry 数量
    // 如果有数据进行下面的迁移
    while(n-- && d->ht[0].used != 0) {
        dictEntry *de, *nextde;
        
        assert(d->ht[0].size > (unsigned long)d->rehashidx);
        while(d->ht[0].table[d->rehashidx] == NULL) {
            d->rehashidx++;
            if (--empty_visits == 0) return 1;
        }
        // 获取老数据中的数据
        de = d->ht[0].table[d->rehashidx];
        
        while(de) {
            uint64_t h;
            nextde = de->next;
            
            // 获取新哈希表的索引
            h = dictHashKey(d, de->key) & d->ht[1].sizemask;
            de->next = d->ht[1].table[h];
            // 将数据放入到新的 dity 中
            d->ht[1].table[h] = de;
            d->ht[0].used--;
            d->ht[1].used++;
            de = nextde;
        d->ht[0].table[d->rehashidx] = NULL;
        d->rehashidx++;
    }
    
    // 这里来检测是否完成了整个的 rehash 操作  
    if (d->ht[0].used == 0) {
        zfree(d->ht[0].table);
        // 将 ht[1] 的内容放入到 ht[0] 中,ht[1] 作为备用,下次 rehash 使用
        d->ht[0] = d->ht[1];
        _dictReset(&d->ht[1]);
        d->rehashidx = -1;
        return 0;
    
    return 1;
}
// 使用增量的方式 rehash 
// 当然数据很大的话,一次迁移所有的数据,显然是不合理的,会造成Redis线程阻塞,无法服务其他请求。这里 Redis 使用的是渐进式 rehash。
// 在 rehash 期间,每次执行添加,删除,查找或者更新操作时,除了对命令本身的处理,还会顺带将哈希表1中的数据拷贝到哈希表2中。从索引0开始,每执行一次操作命令,拷贝一个索引位置的数据。  
// 如果没有读入和写入操作,这时候就不能进行 rehash
// 所以会定时执行一定数量的 rehash 操作  
int incrementallyRehash(int dbid) {
    
    if (dictIsRehashing(server.db[dbid].dict)) {
        dictRehashMilliseconds(server.db[dbid].dict,1);
        return 1; 
    
    if (dictIsRehashing(server.db[dbid].expires)) {
        dictRehashMilliseconds(server.db[dbid].expires,1);
    return 0;

1、rehash 的过程 Redis 默认使用了两个全局哈希表;

2、rehash 的过程是渐进式的,因为如果数据量很大的话,一次迁移所有的数据,会造成Redis线程阻塞,无法服务其他请求;

3、在进行 rehash 期间,删除,查找或者更新操作都会在两个哈希表中执行,添加操作就直接添加到哈希表2中了。查找会把两个哈希表都找一遍,直到找到或者两个都找不到;

4、如果在 reash 期间,如果没有读写操作,这时候,就不能迁移工作了,所以后台定时执行一定数量的数据迁移。

sorted set

sorted set有序集合和集合一样也是 string 类型元素的集合,同时也不允许有重复的成员。

不同的是sorted set中的每个元素都会关联一个 double 类型的分数,sorted set通过这个分数给集合中的成员进行从小到大的排序。有序集合中的成员是唯一的,关联的 score 可以重复。

常见的命令

下面看下有序集合中常见的命令

# 向有序集合添加一个或多个成员,或者更新已存在成员的分数
ZADD key score1 member1 [score2 member2]

# 获取有序集合的成员数
ZCARD key
# 计算在有序集合中指定区间分数的成员数
ZCOUNT key min max
# 有序集合中对指定成员的分数加上增量 increment
ZINCRBY key increment member
# 计算给定的一个或多个有序集的交集并将结果集存储在新的有序集合 destination 中
ZINTERSTORE destination numkeys key [key ...]
# 在有序集合中计算指定字典区间内成员数量
ZLEXCOUNT key min max
# 通过索引区间返回有序集合指定区间内的成员
ZRANGE key start stop [WITHSCORES]
# 通过字典区间返回有序集合的成员
ZRANGEBYLEX key min max [LIMIT offset count]
# 通过分数返回有序集合指定区间内的成员
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT]
# 返回有序集合中指定成员的索引
ZRANK key member
# 移除有序集合中的一个或多个成员
ZREM key member [member ...]
# 移除有序集合中给定的字典区间的所有成员
ZREMRANGEBYLEX key min max
# 移除有序集合中给定的排名区间的所有成员
ZREMRANGEBYRANK key start stop
# 移除有序集合中给定的分数区间的所有成员
ZREMRANGEBYSCORE key min max
# 返回有序集中指定区间内的成员,通过索引,分数从高到低
ZREVRANGE key start stop [WITHSCORES]
# 返回有序集中指定分数区间内的成员,分数从高到低排序
ZREVRANGEBYSCORE key max min [WITHSCORES]
# 返回有序集合中指定成员的排名,有序集成员按分数值递减(从大到小)排序
ZREVRANK key member
# 返回有序集中,成员的分数值
ZSCORE key member
# 计算给定的一个或多个有序集的并集,并存储在新的 key 中
ZUNIONSTORE destination numkeys key [key ...]
# 迭代有序集合中的元素(包括元素成员和元素分值)
ZSCAN key cursor [MATCH pattern] [COUNT count]

来个栗子

127.0.0.1:6379> ZADD test-sset 1 member1
(integer) 1
127.0.0.1:6379> ZADD test-sset 2 member2
(integer) 1
127.0.0.1:6379> ZADD test-sset 3 member3
(integer) 1
127.0.0.1:6379> ZADD test-sset 3 member3
(integer) 0
127.0.0.1:6379> ZADD test-sset 4 member3
(integer) 0
127.0.0.1:6379> ZADD test-sset 5 member5
(integer) 1
127.0.0.1:6379> ZRANGE test-sset 0 10 WITHSCORES
1) "member1"
2) "1"
3) "member2"
4) "2"
5) "member3"
6) "4"
7) "member5"
8) "5"

使用场景

来看下sorted set的使用场景

1、通过 score 的排序功能,可以实现类似排行榜,学习成绩的排序功能;

2、也可以实现带权重队列,比如普通消息的 score 为1,重要消息的 score 为2,然后工作线程可以选择按 score 的倒序来获取工作任务。让重要的任务优先执行;

3、也可以实现一个延迟队列,将 score 存储过期时间,从小到大排序,最靠前的就是最先过期的。

分析下源码实现

sorted set 中的代码主要在下面的两个文件中

结构定义:server.h

实现:t_zset.c

先来看下sorted set的数据结构

typedef struct zset {
    dict *dict;
    zskiplist *zsl;
} zset;

typedef struct zskiplist {
    // 头,尾节点
    struct zskiplistNode *header, *tail;
    // 节点数量
    unsigned long length;
    // 目前表内节点的最大层数
    int level;
} zskiplist;

// ZSETs 使用的是特定版的 Skiplists
typedef struct zskiplistNode {
    // 这里使用 sds 存储具体的元素
    sds ele;
    // 元素的权重
    double score;
    // 后向指针(为了便于从跳表的尾节点倒序查找)
    struct zskiplistNode *backward;
    // 层
    // 保存着指向其他元素的指针。高层的指针越过的元素数量大于等于低层的指针,为了提高查找的效率,程序总是从高层先开始访问,然后随着元素值范围的缩小,慢慢降低层次。
    struct zskiplistLevel {
        // 节点上的前向指针
        struct zskiplistNode *forward;
        // 跨度
        // 用于记录两个节点之间的距离
        unsigned long span;
    } level[];
} zskiplistNode;

看上面的数据结构可以发现sorted set的实现主要使用了 dict 和 zskiplist 两种数据结构。不过sorted set在元素较少的情况下使用的压缩列表,具体细节参见下文的 zsetAdd 函数。

ZADD

来看下 ZADD 的插入

// 代码地址 https://github.com/redis/redis/blob/6.2/class="lazy" data-src/t_zset.c
// ZADD 命令
void zaddCommand(client *c) {
    zaddGenericCommand(c,ZADD_IN_NONE);
}


void zaddGenericCommand(client *c, int flags) {
    ...
    
    zobj = lookupKeyWrite(c->db,key);
    if (checkType(c,zobj,OBJ_ZSET)) goto cleanup;
    if (zobj == NULL) {
        if (xx) goto reply_to_client; 
        // 超过阈值(zset-max-ziplist-entries、zset-max-ziplist-value)后,使用 hashtable + skiplist 存储
        if (server.zset_max_ziplist_entries == 0 ||
            server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
        {
            zobj = createZsetObject();
        } else {
            zobj = createZsetZiplistObject();
        }
        dbAdd(c->db,key,zobj);
    }
// 看下具体的插入过程
int zsetAdd(robj *zobj, double score, sds ele, int in_flags, int *out_flags, double *newscore) {
    
    int incr = (in_flags & ZADD_IN_INCR) != 0;
    int nx = (in_flags & ZADD_IN_NX) != 0;
    int xx = (in_flags & ZADD_IN_XX) != 0;
    int gt = (in_flags & ZADD_IN_GT) != 0;
    int lt = (in_flags & ZADD_IN_LT) != 0;
    
    // 如果类型是 ZIPLIST
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        unsigned char *eptr;
        if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
             ...
            
            // 当分数改变时移除并重新插入
            if (score != curscore) {
                zobj->ptr = zzlDelete(zobj->ptr,eptr);
                zobj->ptr = zzlInsert(zobj->ptr,ele,score);
                *out_flags |= ZADD_OUT_UPDATED;
            }
            return 1;
        // 新元素
        } else if (!xx) {
            
            // 如果元素过大就使用跳表
            if (zzlLength(zobj->ptr)+1 > server.zset_max_ziplist_entries ||
                sdslen(ele) > server.zset_max_ziplist_value ||
                !ziplistSafeToAdd(zobj->ptr, sdslen(ele)))
            {
                zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
            } else {
                if (newscore) *newscore = score;
                *out_flags |= ZADD_OUT_ADDED;
                return 1;
            *out_flags |= ZADD_OUT_NOP;
    
    // 表示使用的类型是跳表
    if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        zset *zs = zobj->ptr;
        zskiplistNode *znode;
        dictEntry *de;
        // 在哈希表中查找元素
        de = dictFind(zs->dict,ele);
        if (de != NULL) {
            
            if (nx) {
                *out_flags |= ZADD_OUT_NOP;
            // 哈希表中获取元素的权重
            curscore = *(double*)dictGetVal(de);
            
            // 更新权重值
            if (incr) {
                score += curscore;
                if (isnan(score)) {
                    *out_flags |= ZADD_OUT_NAN;
                    return 0;
                }
            
            if ((lt && score >= curscore) || (gt && score <= curscore)) {
            if (newscore) *newscore = score;
            
            // 如果权重发生变化了
                znode = zslUpdateScore(zs->zsl,curscore,ele,score);
                
                dictGetVal(de) = &znode->score; 
        // 如果新元素不存在
            ele = sdsdup(ele);
            // 插入到跳表节点
            znode = zslInsert(zs->zsl,score,ele);
            // 在哈希表中插入
            serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
            *out_flags |= ZADD_OUT_ADDED;
    } else {
        serverPanic("Unknown sorted set encoding");
    return 0; 

sorted set 的插入使用了两种策略

1、如果插入的数据量和长度没有达到阀值,就使用压缩列表进行保存,反之就使用跳表加哈希表的组合方式进行保存;

2、压缩列表本身是就不适合保存过多的元素,所以达到阀值使用跳表加哈希表的组合方式进行保存;

3、这里跳表加哈希表的组合方式也是很巧妙的,跳表用来进行范围的查询,通过哈希表来实现单个元素权重值的查询,组合的方式提高了查询的效率;

ZRANGE

看完了插入函数,这里再来分析下 ZRANGE

// 获取有序集合中, 指定数据的排名.
// 若reverse==0, 排名以得分升序排列. 否则排名以得分降序排列.
// 第一个数据的排名为0, 而不是1
// 使用压缩列表或者跳表,里面的数据都是排好序的
long zsetRank(robj *zobj, sds ele, int reverse) {
    unsigned long llen;
    unsigned long rank;

    llen = zsetLength(zobj);
    // 压缩列表
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        unsigned char *zl = zobj->ptr;
        unsigned char *eptr, *sptr;
        eptr = ziplistIndex(zl,0);
        serverAssert(eptr != NULL);
        sptr = ziplistNext(zl,eptr);
        serverAssert(sptr != NULL);
        rank = 1;
        while(eptr != NULL) {
            if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele)))
                break;
            rank++;
            zzlNext(zl,&eptr,&sptr);
        }
        if (eptr != NULL) {
             // 逆向取rank
             // 返回后面的数据
            if (reverse)
                return llen-rank;
            else
                return rank-1;
        } else {
            return -1;
    // 跳表
    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        zset *zs = zobj->ptr;
        zskiplist *zsl = zs->zsl;
        dictEntry *de;
        double score;
        de = dictFind(zs->dict,ele);
        if (de != NULL) {
            score = *(double*)dictGetVal(de);
            // 查找包含给定分值和成员对象的节点在跳跃表中的排位
            rank = zslGetRank(zsl,score,ele);
            
            serverAssert(rank != 0);
            // 逆向取rank
    } else {
        serverPanic("Unknown sorted set encoding");
    }
}

unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *x;
    unsigned long rank = 0;
    int i;
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        // 遍历所有对比的节点
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                sdscmp(x->level[i].forward->ele,ele) <= 0))) {
            rank += x->level[i].span;
            // 沿着前进指针遍历跳跃表
            x = x->level[i].forward;
        
        if (x->ele && sdscmp(x->ele,ele) == 0) {
            return rank;
    return 0;

通过索引区间返回有序集合指定区间内的成员,因为数据在插入的时候,已经按照从小到大进行了,排序,所以返回指定区间的成员,遍历对应的数据即可。

总结

1、Redis 的 Set 是 String 类型的无序集合,集合成员是唯一的;

2、sorted set有序集合和集合一样也是 string 类型元素的集合,同时也不允许有重复的成员。不同的是sorted set中的每个元素都会关联一个 double 类型的分数;

3、set 底层实现主要用到了两种数据结构 hashtable 和 inset(整数集合);

4、sorted set在元素较少的情况下使用的压缩列表存储数据,数据量超过阀值的时候 使用 dict 加 zskiplist 来存储数据;

4、跳表加哈希表的组合方式也是很巧妙的,跳表用来进行范围的查询,通过哈希表来实现单个元素权重值的查询,组合的方式提高了查询的效率。

参考

【Redis核心技术与实战】https://time.geekbang.org/column/intro/100056701
【Redis设计与实现】https://book.douban.com/subject/25900156/
【redis 集合(set)类型的使用和应用场景】https://www.oraclejsq.com/redisjc/040101720.html
【跳跃表】https://redisbook.readthedocs.io/en/latest/internal-datastruct/skiplist.html
【Redis学习笔记】https://github.com/boilingfrog/Go-POINT/tree/master/redis
【Redis 中的 set 和 sorted set 如何使用】https://boilingfrog.github.io/2022/03/21/Redis中的set和sortedset/

到此这篇关于Redis源码分析之set 和 sorted set 使用的文章就介绍到这了,更多相关Redis set 和 sorted set使用内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Redis源码分析之set 和 sorted set 使用

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

java集合类源码分析之Set详解

Set集合与List一样,都是继承自Collection接口,常用的实现类有HashSet和TreeSet。值得注意的是,HashSet是通过HashMap来实现的而TreeSet是通过TreeMap来实现的,所以HashSet和TreeS
2023-05-31

Spring注解之@Import注解的使用和源码分析

今天主要介绍Spring@Import注解,在Spring中@Import使用得比较频繁,它得作用是导入bean,具体的导入方式有多种,特别在SpringBoot项目中,很多地方都使用到了@Import注解,感兴趣的小伙伴可以参考阅读
2023-05-16

Java之Spring Bean作用域和生命周期源码分析

这篇文章主要讲解了“Java之Spring Bean作用域和生命周期源码分析”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Java之Spring Bean作用域和生命周期源码分析”吧!Bea
2023-07-05

nginx slice模块的使用和源码分析小结

Nginxslice模块用于分片并按需流传输大型文件。通过配置sliceon和slice_max_size,可以启用此模块。该模块包含ngx_http_slice_filter_module(分片文件)和ngx_http_slice_stream_module(流式传输分片)。其性能优化包括并发传输、范围请求和分片缓存。slice模块通过分片文件、支持范围请求和并发传输,有效地优化了流化性能,使其成为流媒体应用程序的理想选择。
nginx slice模块的使用和源码分析小结
2024-04-02

Android源码面试宝典之JobScheduler从使用到原理分析(一)【JobScheduler的使用】

我们之前总结过HandlerThread、IntentService,http://t.csdn.cn/U7Qzr,知道了,在子线程执行一些定时任务,android已经给我们提供了现成的一些API。但是我们也知道,这些老的API随着an
2023-08-19

Android源码面试宝典之JobScheduler从使用到原理分析(四)【JobScheduler、StateController 】

上文,从Job任务的创建,到如何与JSC(JobServiceCotext)关联,到具体任务的绑定、执行进行了源码探索,相信大家到现在为止,对于JobScheduler的庐山真面目,脑海中已经有了些许自己的轮廓。但是,我们也知道,目前为
2023-08-19

Android源码面试宝典之JobScheduler从使用到原理分析(二)【JSS的启动】

上文,我们以IntentService入手,先对JobScheduler进行了简单的实例编码使用。本文开始,我们开始就源码入手,开始深入学习、总结JobScheduler的内部实现原理。 前言 我们从使用代码入手,通过阅读JobSch
2023-08-16

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录