草庐IT

Redis数据结构之——跳表skiplist

基层搬砖的Panda 2023-04-18 原文

写在前面

以下内容是基于Redis 6.2.6 版本整理总结

一、跳表(skiplist)

如何理解跳表?在了解跳表之前,我们先从普通链表开始,一点点揭开跳表的神秘面纱~

首先,普通单链表来说,即使链表是有序的,我们要查找某个元素,也需要从头到尾遍历整个链表。这样效率很低,时间复杂度是O(n)。


那么有没有方法提升查询效率呢?我们可以尝试为链表建立“索引”来提升查询效率。如下图,我们在原始链表的基础上,每两个元素提取一个索引,down指向原始链表的节点:

此时,假如我们要查询值为19的节点,我们从索引层开始遍历,当遍历到16时,下个节点的值为23,所以,19一定在这两个节点之间。我们通过16节点的down指针来到原始链表,将继续遍历,直到找到值为19的节点。在没有建“索引”之前,我们需要遍历8次,才能找到19,而在建立“索引”后,需要6次就能找到,也就是,索引帮我们减少了查询的次数。

那如果我们再建一级索引呢?哈哈哈,没想到吧也是6次,这是因为我们的数据量太少,即便加了两级索引,优化效果也不是很明显。在数据量大时,优化效果还是很明显的,有兴趣可以自己动手画一画。

1.1 跳表的时间复杂度

假设链表有n个节点,每两个节点生成一个索引,则有第一层索引节点的个数为n/2,第二层索引节点的个数是第一层个数的一半n/4,以此类推,第h层节点的个数就是n/(2^h)。假设,第h层有两个节点,则:h = log2n - 1,再算上原始链表,则整个跳表的高度就是log2n。

我们在查询某个数据的时候,每一层需要遍历m个节点,那么在跳表中查询某个数据的时间复杂度就是:O(m*log2n)。那m是多少呢? 按照上面每两个节点上升一个索引节点的索引结构,我们每一层索引最多遍历3个节点,为什么呢?解释如下:


假设我们查找的还是19,在第k层索引中,当我们遍历到11时,发现19在11和23之间,我们通过11的down节点,来到第k-1层。在第k-1层索引中,11 到 23 最多包含3个节点(包含11 和 23 的节点),所以在第k-1索引,我们最多需要遍历3个节点,依次类推,每一层索引都最多只需要遍历3个节点。

通过上面的分析,我们知道了m = 3,也就是说在跳表中查询任意节点的时间复杂度是O(3*log2n),去掉常数项后,时间复杂度就是:O(log2n)。这个查找跟二分查找的时间复杂度一样。换句话说,我们是基于单链表实现了二分查找,神奇吧。但是,这种查询效率的提升是有代价的,也就是我们需要维护多层级索引,才能实现。这也是一种空间换时间的思路。

1.2 空间复杂度

要实现log2n的时间复杂度,跳表就需要额外存储这些索引的空间。那么,需要多大的空间呢?我们来分析一下:

假设原始链表有n个节点,按照每连个节点上升一个索引节点的索引结构,第一层有n/2,第二层n/4,依次类推,第h层有n/2^h个索引节点。假设第h层有2个节点。则总共有:
n/2 + n/4 + n/8 + … + 2 = n-2。所以,跳表的空间复杂度是O(n)

也就是说,如果将含有n个节点的链表构造成跳表,我们还需要额外再用接近n个节点来存储这些索引,还有没有办法较少索引占的空间呢?答案是有的,上面的分析是基于每两个节点上升一个索引节点,那么换成3个、5个呢?如果为3,也很好分析,需要的索引总数为:n/3 + n/9 + n/27 + … + 3 + 1 = n/2。尽管空间复杂还是O(n),但实际上索引的数量已经减少了一半了。

在实际开发中,原始链表中的对象可能是很大的对象,而索引节点只是存储关键的值和指针,相较于原始节点,大小可以忽略不计。

1.3 跳表的插入和删除

我们想在跳表中插入和删除一个节点,第一步是要找到插入和删除的位置,然后再执行插入或者删除,因为跳表的查询时间复杂度是O(log2n),插入和删除的时间复杂度也是O(log2n)。

1.3.1 插入

1.3.2 删除

删除操作就需要注意一下,如果删除的节点也存在于索引节点中,那么,索引中的节点也要删除。单链表中的删除,需要拿到前驱节点的指针,如果是双向链表就不用考虑了。

1.4 跳表索引的动态更新

当我们一直往跳表中添加元素,如果不更新索引就可能出现,某2个索引之间的索引数过多,极端情况下,会退化为单向链表。

作为一种动态数据结构,我们需要某种手段作为索引节点和原始链表大小的平衡,也就是说,当链表中的节点数增多时,也响应的增加一些索引节点,避免复杂度的退化。红黑树和AVL树是通过左旋和右旋来维持左右子树的平衡。跳表则是通过随机函数来维护这种平衡。

2、跳表在Redis中的应用

有序集合 zet 的底层实现就是跳表。大部分情况下,跳表的效率可以和平衡树媲美,平均时间复杂度O(logn),最坏O(n)。

2.1 跳表源码

每次创建一个新的跳表节点时,会根据幂次定律(越大的数出现的概率越小)随机生成一个介于1到32之间的数作为level数组的大小,这个数组大小就是层的高度。level层数确定源码:

// src/t_zset.c
int zslRandomLevel(void) {
    int level = 1;
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

两个宏定义

// src/sever.h
#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^64 elements */
#define ZSKIPLIST_P 0.25      /* Skiplist P = 1/4 */

server.h

// 996行
/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
	// sds 对象,唯一的
    sds ele;
    // 分值
    double score;
    // 后退指针,用于从后往前遍历使用
    struct zskiplistNode *backward;
    // 层数
    struct zskiplistLevel {
    	// 前进指针
        struct zskiplistNode *forward;
        // 跨度,用来确定本节点再链表中的排位  zrank
        unsigned long span;
    } level[];
} zskiplistNode;

typedef struct zskiplist {
	// 指向跳表头节点和尾节点的指针
    struct zskiplistNode *header, *tail;
    // 跳表中的元素个数,不包含头节点 zcard
    unsigned long length;
    // 跳表中层数最高的节点的层数
    int level;
} zskiplist;

1.3 创建skiplist

zskiplistNode *zslCreateNode(int level, double score, sds ele) {
    zskiplistNode *zn =
        zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
    zn->score = score;
    zn->ele = ele;
    return zn;
}

/* Create a new skiplist. */
zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;

    zsl = zmalloc(sizeof(*zsl));
    zsl->level = 1;
    zsl->length = 0;
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
        zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
    zsl->header->backward = NULL;
    zsl->tail = NULL;
    return zsl;
}

1.4 跳表的插入和删除

zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                    sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* we assume the element is not already inside, since we allow duplicated
     * scores, reinserting the same element should never happen since the
     * caller of zslInsert() should test in the hash table if the element is
     * already inside or not. */
    level = zslRandomLevel();
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    x = zslCreateNode(level,score,ele);
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;

        /* update span covered by update[i] as x is inserted here */
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    /* increment span for untouched levels */
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}


// 删除
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
    int i;
    for (i = 0; i < zsl->level; i++) {
        if (update[i]->level[i].forward == x) {
            update[i]->level[i].span += x->level[i].span - 1;
            update[i]->level[i].forward = x->level[i].forward;
        } else {
            update[i]->level[i].span -= 1;
        }
    }
    if (x->level[0].forward) {
        x->level[0].forward->backward = x->backward;
    } else {
        zsl->tail = x->backward;
    }
    while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
        zsl->level--;
    zsl->length--;
}

三、总结

  1. 跳表是有序集合zset的实现之一
  2. 跳表由zskiplist 和 zskiplistNode两个结构组成,zskiplist保存跳表的信息,如表头和表尾节点、跳表的长度等,zskiplistNode 保存节点详细信息
  3. 每个跳表节点的层高都是 1~32 之间的随机数
  4. 跳表中的对象是唯一的
  5. 跳表中的元素是按照分值从小到大排列,当分值相同时,按照成员对象的大小排序

文章参考与<零声教育>的C/C++linux服务期高级架构系统教程学习

有关Redis数据结构之——跳表skiplist的更多相关文章

  1. ruby - 使用 ruby​​ 将 HTML 转换为纯文本并维护结构/格式 - 2

    我想将html转换为纯文本。不过,我不想只删除标签,我想智能地保留尽可能多的格式。为插入换行符标签,检测段落并格式化它们等。输入非常简单,通常是格式良好的html(不是整个文档,只是一堆内容,通常没有anchor或图像)。我可以将几个正则表达式放在一起,让我达到80%,但我认为可能有一些现有的解决方案更智能。 最佳答案 首先,不要尝试为此使用正则表达式。很有可能你会想出一个脆弱/脆弱的解决方案,它会随着HTML的变化而崩溃,或者很难管理和维护。您可以使用Nokogiri快速解析HTML并提取文本:require'nokogiri'h

  2. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  3. ruby - Ruby 有 `Pair` 数据类型吗? - 2

    有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳

  4. ruby - 是否有用于序列化和反序列化各种格式的对象层次结构的模式? - 2

    给定一个复杂的对象层次结构,幸运的是它不包含循环引用,我如何实现支持各种格式的序列化?我不是来讨论实际实现的。相反,我正在寻找可能会派上用场的设计模式提示。更准确地说:我正在使用Ruby,我想解析XML和JSON数据以构建复杂的对象层次结构。此外,应该可以将该层次结构序列化为JSON、XML和可能的HTML。我可以为此使用Builder模式吗?在任何提到的情况下,我都有某种结构化数据-无论是在内存中还是文本中-我想用它来构建其他东西。我认为将序列化逻辑与实际业务逻辑分开会很好,这样我以后就可以轻松支持多种XML格式。 最佳答案 我最

  5. ruby - 我如何添加二进制数据来遏制 POST - 2

    我正在尝试使用Curbgem执行以下POST以解析云curl-XPOST\-H"X-Parse-Application-Id:PARSE_APP_ID"\-H"X-Parse-REST-API-Key:PARSE_API_KEY"\-H"Content-Type:image/jpeg"\--data-binary'@myPicture.jpg'\https://api.parse.com/1/files/pic.jpg用这个:curl=Curl::Easy.new("https://api.parse.com/1/files/lion.jpg")curl.multipart_form_

  6. 世界前沿3D开发引擎HOOPS全面讲解——集3D数据读取、3D图形渲染、3D数据发布于一体的全新3D应用开发工具 - 2

    无论您是想搭建桌面端、WEB端或者移动端APP应用,HOOPSPlatform组件都可以为您提供弹性的3D集成架构,同时,由工业领域3D技术专家组成的HOOPS技术团队也能为您提供技术支持服务。如果您的客户期望有一种在多个平台(桌面/WEB/APP,而且某些客户端是“瘦”客户端)快速、方便地将数据接入到3D应用系统的解决方案,并且当访问数据时,在各个平台上的性能和用户体验保持一致,HOOPSPlatform将帮助您完成。利用HOOPSPlatform,您可以开发在任何环境下的3D基础应用架构。HOOPSPlatform可以帮您打造3D创新型产品,HOOPSSDK包含的技术有:快速且准确的CAD

  7. FOHEART H1数据手套驱动Optitrack光学动捕双手运动(Unity3D) - 2

    本教程将在Unity3D中混合Optitrack与数据手套的数据流,在人体运动的基础上,添加双手手指部分的运动。双手手背的角度仍由Optitrack提供,数据手套提供双手手指的角度。 01  客户端软件分别安装MotiveBody与MotionVenus并校准人体与数据手套。MotiveBodyMotionVenus数据手套使用、校准流程参照:https://gitee.com/foheart_1/foheart-h1-data-summary.git02  数据转发打开MotiveBody软件的Streaming,开始向Unity3D广播数据;MotionVenus中设置->选项选择Unit

  8. 使用canal同步MySQL数据到ES - 2

    文章目录一、概述简介原理模块二、配置Mysql使用版本环境要求1.操作系统2.mysql要求三、配置canal-server离线下载在线下载上传解压修改配置单机配置集群配置分库分表配置1.修改全局配置2.实例配置垂直分库水平分库3.修改group-instance.xml4.启动监听四、配置canal-adapter1修改启动配置2配置映射文件3启动ES数据同步查询所有订阅同步数据同步开关启动4.验证五、配置canal-admin一、概述简介canal是Alibaba旗下的一款开源项目,Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。Git地址:https://github.co

  9. ruby-on-rails - 创建 ruby​​ 数据库时惰性符号绑定(bind)失败 - 2

    我正在尝试在Rails上安装ruby​​,到目前为止一切都已安装,但是当我尝试使用rakedb:create创建数据库时,我收到一个奇怪的错误:dyld:lazysymbolbindingfailed:Symbolnotfound:_mysql_get_client_infoReferencedfrom:/Library/Ruby/Gems/1.8/gems/mysql2-0.3.11/lib/mysql2/mysql2.bundleExpectedin:flatnamespacedyld:Symbolnotfound:_mysql_get_client_infoReferencedf

  10. STM32读取串口传感器数据(颗粒物传感器,主动上传) - 2

    文章目录1.开发板选择*用到的资源2.串口通信(个人理解)3.代码分析(注释比较详细)1.主函数2.串口1配置3.串口2配置以及中断函数4.注意问题5.源码链接1.开发板选择我用的是STM32F103RCT6的板子,不过代码大概在F103系列的板子上都可以运行,我试过在野火103的霸道板上也可以,主要看一下串口对应的引脚一不一样就行了,不一样的就更改一下。*用到的资源keil5软件这里用到了两个串口资源,采集数据一个,串口通信一个,板子对应引脚如下:串口1,TX:PA9,RX:PA10串口2,TX:PA2,RX:PA32.串口通信(个人理解)我就从串口采集传感器数据这个过程说一下我自己的理解,

随机推荐