草庐IT

Redis6.0多I/O线程实现原理

LJessie 2023-03-28 原文

在 2020 年 5 月推出的 Redis 6.0 版本中,Redis 在执行模型中还进一步使用了多线程来处理 IO 任务。之前在:https://www.jianshu.com/p/0323fc06a36f 简单讨论过Redis执行命令的过程大致分为:读取命令、解析命令、执行命令、返回结果四个阶段。而多线程处理 IO 任务的目的,就是为了充分利用当前服务器的多核特性,使用多核运行多线程,让多线程帮助加速命令读取、命令解析以及数据写回的速度,提升 Redis 整体性能。

源码地址:https://github.com/redis/redis/tree/6.0/src

1. 基本步骤

1.1. 输入、输出缓冲区

为了避免客户端和服务器端的请求发送和处理速度不匹配,服务器端给每个连接的客户端都设置了一个输入缓冲区和输出缓冲区,我们称之为客户端输入缓冲区和输出缓冲区

输入缓冲区会先把客户端发送过来的命令暂存起来,Redis 主线程再从输入缓冲区中读取命令,进行处理。当 Redis 主线程处理完数据后,会把结果写入到输出缓冲区,再通过输出缓冲区返回给客户端,如下图所示:


1.png

1.2. 多线程处理网络请求

引入多I/O线程优化之后,为了避免多线程访问共享资源造成的线程安全问题,执行命令阶段仍然是在主线程中执行的,而I/O线程只是在读取客户端请求、解析命令、将命令执行结果返回给客户端的时候起作用。而且,在同一时刻,读写客户端命令操作和执行命令操作只有一种在运行。下面用一张表格来简单描述下这个过程:

主线程 I/O线程
T1 接收客户端连接,建立连接socket
T2 把连接socket分配给I/O线程
T3 等待I/O线程读取、解析命令
T4 读取命令
T5 解析命令
T6 执行命令
T7 将结果写到输出缓冲区
T8 等待I/O线程写回客户端
T9 将缓冲区数据写回客户端
T10 I/O线程写回客户端完成,等待后续请求

2. 源码解析

2.1. 数据结构

在Redis中,全局变量都会保存在redisServer结构体类型的变量server中(在server.h文件中),我们的RDB、AOF、主从等配置都是在这个server变量中保存的,而多I/O线程中有一个全局变量io_threads_active,来表示Redis是否开启了多I/O线程:

  • server.io_threads_active = 0,未启动多I/O线程。
  • server.io_threads_active = 1,启动多I/O线程。

server中还有一个变量io_threads_num保存I/O线程的数量。

server中有两个 List 类型的成员变量:clients_pending_write 和 clients_pending_read,它们分别记录了待写回数据的客户端和待读取数据的客户端,如下所示:

struct redisServer {
...
list *clients_pending_write;  //待写回数据的客户端
list *clients_pending_read;  //待读取数据的客户端
...
}

在networking.c文件中,定义了四个数组,用来保存多I/O线程的相关数据:

  • io_threads_list 数组:每个元素是一个List类型的列表,列表保存了每个线程待处理的客户端,比如io_threads_list[0]保存了0号线程要处理的客户端列表;
  • io_threads_pending 数组:保存等待每个 IO 线程处理的客户端个数;
  • io_threads_mutex 数组:保存线程互斥锁,可以对标Java的ReentrantLock,到后面会介绍这个数组的作用;
  • io_threads 数组:保存每个 IO 线程。
pthread_t io_threads[IO_THREADS_MAX_NUM];   //记录线程描述符的数组
pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];  //记录线程互斥锁的数组
_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];  //记录线程待处理的客户端个数
list *io_threads_list[IO_THREADS_MAX_NUM];  //记录线程对应处理的客户端

networking.c还定义了一个int型的变量io_threads_op,表示I/O线程当前要执行的操作是读操作还是写操作,这个变量也表明所有的I/O线程要么都在读,要么都在写,不可能一部分I/O线程在读,一部分I/O线程在写,该变量有两个值:

  • IO_THREADS_OP_WRITE:这表明该 IO 线程要做的是写操作。
  • IO_THREADS_OP_READ:这表明该 IO 线程要做的是读操作。

每一次客户端连接请求进来的时候,redis都会为这个客户端创建一个client变量,而client有一个属性flags,flags标记了当前客户端的状态,这次仅分析和多I/O线程有关的三个状态:

  • CLIENT_PENDING_READ:有命令等待被读取
  • CLIENT_PENDING_WRITE:等待被写回
  • CLIENT_PENDING_COMMAND:命令已经被解析,等待被执行

2.2. 初始化

redis启动的main函数(在server.c文件中)会执行server的初始化过程,server 在初始化过程的最后,调用 InitSeverLast 函数,而 InitServerLast 函数再进一步调用 initThreadedIO 函数(在networking.c文件中)来完成多I/O线程的初始化操作。具体如下所示:

void InitServerLast() {
    bioInit();
    initThreadedIO();  //调用initThreadedIO函数初始化IO线程
    set_jemalloc_bg_thread(server.jemalloc_bg_thread);
    server.initial_memory_usage = zmalloc_used_memory();
}

下面来看下initThreadedIO函数的主要执行流程,他主要分为3步:

  1. 设置激活I/O线程的标志为未启动,即server.io_threads_active = 0。
  2. 用两个if来判断是否推出方法:设置的线程数是否为1;设置的线程数是否超过最大阈值。
  3. 初始化I/O线程,即上面提到的networking.c文件中定义的有关I.O线程的数据结构。

具体如下:

/* Initialize the data structures needed for threaded I/O. */
void initThreadedIO(void) {
    //1. 设置激活I/O线程的标志为未启动
    server.io_threads_active = 0; /* We start with threads not active. */

    //2. 设置的线程数是否为1
    /* Don't spawn any thread if the user selected a single thread:
     * we'll handle I/O directly from the main thread. */
    if (server.io_threads_num == 1) return;
    //2. 设置的线程数是否超过最大阈值
    if (server.io_threads_num > IO_THREADS_MAX_NUM) {
        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
                             "The maximum number is %d.", IO_THREADS_MAX_NUM);
        exit(1);
    }
    //3.初始化I/O线程
    /* Spawn and initialize the I/O threads. */
    for (int i = 0; i < server.io_threads_num; i++) {
        /* Things we do for all the threads including the main thread. */
        io_threads_list[i] = listCreate();
        if (i == 0) continue; /* Thread 0 is the main thread. */

        /* Things we do only for the additional threads. */
        pthread_t tid;
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        io_threads_pending[i] = 0;
        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1);
        }
        io_threads[i] = tid;
    }
}

在初始化I/O线程的时候,会调用pthread_create函数来创建I/O线程,pthread_create可以对标Java的 new Thread(runnable),然后IOThreadMain函数就是I/O线程执行的主函数,I/O线程获取客户端列表,然后根据当前I/O线程的操作类型,来执行读取命令或写回客户端操作。IOThreadMain还有一些线程同步的操作等到后面再讨论。

void *IOThreadMain(void *myid) {
    ......
    while(1) {
        ......
        //获取IO线程要处理的客户端列表
        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                //如果线程操作是写操作,则将数据写回客户端
                writeToClient(c,0);
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                //如果线程操作是读操作,则从客户端读取数据
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        listEmpty(io_threads_list[id]);
        io_threads_pending[id] = 0;

        if (tio_debug) printf("[%ld] Done\n", id);
    }
}

2.3. 读取命令

https://www.jianshu.com/p/0323fc06a36f分析命令执行过程中有提到,redis是在readQueryFromClient函数(networking.c文件)中执行命令读取操作的,而在readQueryFromClient函数一开始,会判断是否开启了I/O线程,

void readQueryFromClient(connection *conn) {
    ......
    /* Check if we want to read from the client later when exiting from
     * the event loop. This is the case if threaded I/O is enabled. */
    if (postponeClientRead(c)) return;
}

而在postponeClientRead函数中,有五个判断条件,分别是:

  1. I/O线程被激活,即server.io_threads_active = 1,在2.2分析初始化的时候,io_threads_active 是被设置为0的,而具体什么时候会把他置为1,会在2.5分析线程间同步的时候再讨论。
  2. I/O线程可以用于读取命令,这个变量值是在 Redis 配置文件 redis.conf 中,通过配置项 io-threads-do-reads 设置的,默认值为 no,如果想用多 IO 线程处理客户端读操作,就需要把 io-threads-do-reads 配置项设为 yes。
  3. 客户端没有被暂停。
  4. processEventsWhileBlokced 函数没有在执行。
  5. 客户端现有标识不能有 CLIENT_MASTER、CLIENT_SLAVE 和 CLIENT_PENDING_READ,前面两个常量是关于主从的,这里先不讨论,CLIENT_PENDING_READ前面有提到过。
/* Return 1 if we want to handle the client read later using threaded I/O.
 * This is called by the readable handler of the event loop.
 * As a side effect of calling this function the client is put in the
 * pending read clients and flagged as such. */
int postponeClientRead(client *c) {
    if (server.io_threads_active &&
        server.io_threads_do_reads &&
        !clientsArePaused() &&
        !ProcessingEventsWhileBlocked &&
        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
    {
        c->flags |= CLIENT_PENDING_READ;
        listAddNodeHead(server.clients_pending_read,c);
        return 1;
    } else {
        return 0;
    }
}

如果五个条件都满足的话,就把客户端的状态设置为CLIENT_PENDING_READ,并把当前客户端添加到server.clients_pending_read列表中。

也就是说当Redis接收到客户端请求时,如果I/O线程被激活,并不会直接读取命令,而是把client记为CLIENT_PENDING_READ,并把client添加到“等待被读取客户端”clients_pending_read列表中就直接返回了。这个操作是在主线程中进行的。
分析Redis命令处理过程中有提到,Redis会在一个循环中接收客户端请求,在阻塞等待客户端请求到来之前,会调用beforesleep函数,进而调用handleClientsWithPendingReadsUsingThreads函数来处理等待被读取的客户端,该函数主要逻辑分为四步:

  1. 判定 IO 线程是否激活,以及用户是否设置了 Redis 可以用 IO 线程处理待读客户端,如果不满足条件直接返回,这两个判断和postponeClientRead函数中的第1、2个条件相同。
  2. 取出等待被读取的客户端,以轮询的方式分配给各个I/O线程。
  3. 主线程把自己该读取的客户端中的命令,先读取、解析完。这里要说明的一点就是主线程就是0号线程也会参与I/O操作,并且读取的是io_threads_list[0]中的元素。
  4. 当I/O线程读取、解析完成之后,执行server.clients_pending_read列表中所有的客户端命令。
int handleClientsWithPendingReadsUsingThreads(void) {
    //1. 判定 IO 线程是否激活,以及用户是否设置了 Redis 可以用 IO 线程处理待读客户端
    if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
    ......

    //2. 取出等待被读取的客户端,以轮询的方式分配给各个I/O线程。
    /* Distribute the clients across N different lists. */
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }
    ......
    //3.主线程把自己该读取的客户端中的命令,先读取、解析完
    /* Also use the main thread to process a slice of clients. */
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }
    listEmpty(io_threads_list[0]);
    //等待I/O线程解析完所有的命令
    /* Wait for all the other threads to end their work. */
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    if (tio_debug) printf("I/O READ All threads finshed\n");
    //4.执行所有的命令
    /* Run the list of clients again to process the new buffers. */
    while(listLength(server.clients_pending_read)) {
        ln = listFirst(server.clients_pending_read);
        client *c = listNodeValue(ln);
        ......
        //执行所有命令
        if (processPendingCommandsAndResetClient(c) == C_ERR) {
            /* If the client is no longer valid, we avoid
             * processing the client later. So we just go
             * to the next. */
            continue;
        }
        ......
    }

    ......
    return processed;
}

这里再重新说一下readQueryFromClient函数,刚开始该函数只是把客户端添加到全局列表中,并标记client.flags 为 CLIENT_PENDING_READ,就直接返回了。这个时候再进入这个函数,就会进入后面的读取逻辑,进而调用processInputBuffer解析命令。这一部分逻辑可以参考https://www.jianshu.com/p/0323fc06a36f

而processInputBuffer函数的最后,会判断当前客户端client.flags是否为CLIENT_PENDING_READ,如果是的话,就把该客户端标识设置为CLIENT_PENDING_COMMAND,即解析完客户端的命令,就直接返回了,并没有执行客户端的命令

/* If we are in the context of an I/O thread, we can't really
* execute the command here. All we can do is to flag the client
* as one that needs to process the command. */
if (c->flags & CLIENT_PENDING_READ) {
    c->flags |= CLIENT_PENDING_COMMAND;
    break;
}

2.4. 写回客户端

在2.3中分析的handleClientsWithPendingReadsUsingThreads函数中,当所有客户端的命令都解析完成以后,循环遍历所有的客户端,调用processPendingCommandsAndResetClient函数执行客户端命令,而processPendingCommandsAndResetClient会进而调用processCommand(在server.c文件中)函数来处理客户端命令,处理完客户端命令后,调用addReply函数(在networking.c文件中)写回客户端。

而addReply函数一开始,会调用 prepareClientToWrite 函数,来判断是否使用I/O线程来写回客户端:

void addReply(client *c, robj *obj) {
    if (prepareClientToWrite(c) != C_OK) return;
    ...
}

prepareClientToWrite函数前面一些主从的判断这里先忽略,在最后会调用 clientHasPendingReplies 函数,判断当前客户端是否还有留存在输出缓冲区中的数据等待写回。在前面2.3最后解析完客户端命令之后,就会把client.flags设置为CLIENT_PENDING_COMMAND,所以第二个条件也会满足

如果没有的话,那么,prepareClientToWrite 就会调用 clientInstallWriteHandler 函数,再进一步判断能否推迟该客户端写操作

int prepareClientToWrite(client *c) {
    ......
    if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))
            clientInstallWriteHandler(c);
    /* Authorize the caller to queue in the output buffer of this client. */
    return C_OK;
}

clientInstallWriteHandler函数中,忽略掉主从相关判断,就判断c->flags 是否为 CLIENT_PENDING_WRITE,而这个时候client.flags为CLIENT_PENDING_COMMAND,满足条件。把c->flags设置为CLIENT_PENDING_WRITE,并把当前client添加到全局变量server.clients_pending_write中,就返回了。

void clientInstallWriteHandler(client *c) {
    if (!(c->flags & CLIENT_PENDING_WRITE) &&
        (c->replstate == REPL_STATE_NONE ||
         (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
    {
        c->flags |= CLIENT_PENDING_WRITE;
        listAddNodeHead(server.clients_pending_write,c);
    }
}

prepareClientToWrite函数返回之后,addReply函数会进一步把写回给客户端的数据写到输出缓冲区中,这时候并没有真正的把数据写回给客户端

在2.3中提到Redis在循环接收客户端请求时,会调用beforesleep函数会调用handleClientsWithPendingReadsUsingThreads函数读取、解析客户端请求并执行客户端命令,然后beforeSleep函数会调用handleClientsWithPendingWritesUsingThreads函数来把缓冲区中的数据写回客户端。
handleClientsWithPendingWritesUsingThreads函数的执行逻辑大致可以分为四步:

  1. 判断是否需要使用I/O线程来写回客户端。
  2. 把待写客户端,按照轮询方式分配给 I/O 线程,添加到 io_threads_list 数组各元素中。
  3. 主 I/O 线程处理其待写客户端,并执行 while(1) 循环等待所有 I/O 线程完成处理。
  4. 再次检查是否还有缓冲区的数据未被写回客户端。这里是通过事件驱动框架来将缓冲区的数据写回客户端的,具体就不详细讨论了。
int handleClientsWithPendingWritesUsingThreads(void) {
    ......
    /* If I/O threads are disabled or we have few clients to serve, don't
     * use I/O threads, but thejboring synchronous code. */
    //1. 判断是否需要使用I/O线程来写回客户端。
    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
        return handleClientsWithPendingWrites();
    }
    ......
    //2. 把待写客户端,按照轮询方式分配给 IO 线程
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        ......
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    //3. 主I/O线程处理其待写客户端
    /* Also use the main thread to process a slice of clients. */
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        writeToClient(c,0);
    }
    listEmpty(io_threads_list[0]);
    //等待所有 I/O 线程完成处理
    /* Wait for all the other threads to end their work. */
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    if (tio_debug) printf("I/O WRITE All threads finshed\n");

    /* Run the list of clients again to install the write handler where
     * needed. */
     //4.再次检查是否还有缓冲区的数据未被写回客户端
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        ......
    }
    listEmpty(server.clients_pending_write);

    /* Update processed count on server */
    server.stat_io_writes_processed += processed;

    return processed;
}

2.5. 线程间同步

2.5.1 启动I/O线程

在前面2.1分析数据结构的时候,有一个互斥锁数组io_threads_mutex,数组里面的每一个元素就是一把I/O线程对应的互斥锁。

在2.2初始化I/O线程的时候,initThreadedIO函数中,循环创建io_threads_num个线程,在真正调用pthread_create创建线程之前,主线程会先调用pthread_mutex_lock函数,获取到这个I/O线程对应的锁,具体代码如下:

void initThreadedIO(void) {
    ......
    //3.初始化I/O线程
    for (int i = 0; i < server.io_threads_num; i++) {
        ......
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        io_threads_pending[i] = 0;
        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            ......
        }
        io_threads[i] = tid;
    }
}

而I/O线程执行的主函数中,首先会循环100w次,来等待I/O线程任务的到来,如果循环100w次还没有任务到来的时候,就会调用pthread_mutex_lock获取该I/O线程的互斥锁,而这把互斥锁在该线程创建之前,已经被主线程拿到了,所以I/O线程就阻塞在这里,等待被唤醒。就算I/O线程获取到互斥锁,也会立刻释放掉,让主线程可以随时停止I/O线程。

void *IOThreadMain(void *myid) {
    ......
    while(1) {
        /* Wait for start */
        for (int j = 0; j < 1000000; j++) {
            if (io_threads_pending[id] != 0) break;
        }

        /* Give the main thread a chance to stop this thread. */
        if (io_threads_pending[id] == 0) {
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }
    }
}

在2.4写回客户端时候,handleClientsWithPendingWritesUsingThreads函数首先会进行两个判断,如下所示:

int handleClientsWithPendingWritesUsingThreads(void) {
    ......
    /* If I/O threads are disabled or we have few clients to serve, don't
     * use I/O threads, but thejboring synchronous code. */
    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
        return handleClientsWithPendingWrites();
    }

    /* Start threads if needed. */
    if (!server.io_threads_active) startThreadedIO();
}

第一个if判断:如果设置的I/O线程数量为1(io_threads_num通过redis.conf配置),并且有必要停止I/O线程的话,就使用单线程的方式执行写回操作。

而stopThreadedIOIfNeeded会判断当前等待被处理的客户端数量pending,是否小于server.io_threads_num*2,如果小于的话,就会停止I/O线程,停止I/O线程同样是主线程获取该I/O线程的互斥锁。

int stopThreadedIOIfNeeded(void) {
    int pending = listLength(server.clients_pending_write);

    /* Return ASAP if IO threads are disabled (single threaded mode). */
    if (server.io_threads_num == 1) return 1;

    if (pending < (server.io_threads_num*2)) {
        if (server.io_threads_active) stopThreadedIO();
        return 1;
    } else {
        return 0;
    }
}

void stopThreadedIO(void) {
    ......
    for (int j = 1; j < server.io_threads_num; j++)
        pthread_mutex_lock(&io_threads_mutex[j]);
    server.io_threads_active = 0;
}

第二个if判断:判断server.io_threads_active的值,由于在初始化的时候,该值是被设为0的,并且走到这个判断,就说明客户端数量pending > server.io_threads_num*2,这个时候会调用startThreadedIO函数来启动I/O线程,startThreadxianedIO函数里面,其实就是释放掉每个I/O线程的互斥锁,这样I/O线程就可以获取到互斥锁继续执行了。

void startThreadxianedIO(void) {
    ......
    for (int j = 1; j < server.io_threads_num; j++)
        pthread_mutex_unlock(&io_threads_mutex[j]);
    server.io_threads_active = 1;
}

2.5.2 写回客户端同步

在前面2.4已经介绍过写回客户端函数handleClientsWithPendingWritesUsingThreads的执行逻辑,他是在主线程中执行的,而在2.2初始化时候也讨论过I/O线程执行的主函数IOThreadMain的主要逻辑。

在这两个线程的执行方法中,有两个变量是都会被这两个线程进行读写操作的,io_threads_listio_threads_pending,io_threads_list存放每个I/O线程需要处理的客户端列表,io_threads_pending保存每个I/O线程待处理客户端的数量。

下面通过一张表格来分析主线程和I/O线程是否会出现线程安全问题:


3.png

下面简单分析下上面这个表格:

  • T1、T2时刻对应了I/O线程的创建,即2.5.1讨论的过程
  • T3、T4时刻,在I/O线程被启动之后,主线程首先会给io_threads_list添加client,而此时I/O线程在循环判断io_threads_pending是否等于0。
  • T5时刻主线程设置了I/O线程的操作类型为写操作。
  • T6时刻主线程设置了I/O线程需要处理的客户端数量之后,I/O线程才开始继续执行while循环里面后续操作。
  • T7时刻主线程循环判断io_threads_pending是否等于0,而这时I/O线程在写回客户端。
  • T8、T9时刻I/O线程清空客户端列表,并设置io_threads_pending = 0,主线程才退出循环继续执行后续操作。

分析读取客户端命令的同步操作,其实和上面本小节分析的差不多,因此不再累述了,如果有问题的话,可以留言一起讨论。

虽然两个线程都有对io_threads_list和io_threads_pending变量的读、写操作,但是在同一时刻,只有一个线程在写其中的一个变量,因此并不会出现线程安全问题,真是艺高人胆大啊,希望有朝一日我也能写出如此骚气的代码,继续加油吧。
参考资料:

  1. 极客时间专栏《Redis源码剖析与实战》.蒋德钧.2021
  2. 极客时间专栏《Redis核心技术与实战》.蒋德钧.2020
  3. Redis 6.0源码:https://github.com/redis/redis/tree/6.0/src

有关Redis6.0多I/O线程实现原理的更多相关文章

  1. ruby - RuntimeError(自动加载常量 Apps 多线程时检测到循环依赖 - 2

    我收到这个错误:RuntimeError(自动加载常量Apps时检测到循环依赖当我使用多线程时。下面是我的代码。为什么会这样?我尝试多线程的原因是因为我正在编写一个HTML抓取应用程序。对Nokogiri::HTML(open())的调用是一个同步阻塞调用,需要1秒才能返回,我有100,000多个页面要访问,所以我试图运行多个线程来解决这个问题。有更好的方法吗?classToolsController0)app.website=array.join(',')putsapp.websiteelseapp.website="NONE"endapp.saveapps=Apps.order("

  2. ruby - 如何根据特征实现 FactoryGirl 的条件行为 - 2

    我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden

  3. 华为OD机试用Python实现 -【明明的随机数】 2023Q1A - 2

    华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o

  4. 基于C#实现简易绘图工具【100010177】 - 2

    C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.

  5. MIMO-OFDM无线通信技术及MATLAB实现(1)无线信道:传播和衰落 - 2

     MIMO技术的优缺点优点通过下面三个增益来总体概括:阵列增益。阵列增益是指由于接收机通过对接收信号的相干合并而活得的平均SNR的提高。在发射机不知道信道信息的情况下,MIMO系统可以获得的阵列增益与接收天线数成正比复用增益。在采用空间复用方案的MIMO系统中,可以获得复用增益,即信道容量成倍增加。信道容量的增加与min(Nt,Nr)成正比分集增益。在采用空间分集方案的MIMO系统中,可以获得分集增益,即可靠性性能的改善。分集增益用独立衰落支路数来描述,即分集指数。在使用了空时编码的MIMO系统中,由于接收天线或发射天线之间的间距较远,可认为它们各自的大尺度衰落是相互独立的,因此分布式MIMO

  6. 【Java入门】使用Java实现文件夹的遍历 - 2

    遍历文件夹我们通常是使用递归进行操作,这种方式比较简单,也比较容易理解。本文为大家介绍另一种不使用递归的方式,由于没有使用递归,只用到了循环和集合,所以效率更高一些!一、使用递归遍历文件夹整体思路1、使用File封装初始目录,2、打印这个目录3、获取这个目录下所有的子文件和子目录的数组。4、遍历这个数组,取出每个File对象4-1、如果File是否是一个文件,打印4-2、否则就是一个目录,递归调用代码实现publicclassSearchFile{publicstaticvoidmain(String[]args){//初始目录Filedir=newFile("d:/Dev");Datebeg

  7. ruby - 如何让Ruby捕获线程中的语法错误 - 2

    我正在尝试使用ruby​​编写一个双线程客户端,一个线程从套接字读取数据并将其打印出来,另一个线程读取本地数据并将其发送到远程服务器。我发现的问题是Ruby似乎无法捕获线程内的错误,这是一个示例:#!/usr/bin/rubyThread.new{loop{$stdout.puts"hi"abc.putsefsleep1}}loop{sleep1}显然,如果我在线程外键入abc.putsef,代码将永远不会运行,因为Ruby将报告“undefinedvariableabc”。但是,如果它在一个线程内,则没有错误报告。我的问题是,如何让Ruby捕获这样的错误?或者至少,报告线程中的错误?

  8. ruby - 如何在 ruby​​ 中运行后台线程? - 2

    我是ruby​​的新手,我认为重新构建一个我用C#编写的简单聊天程序是个好主意。我正在使用Ruby2.0.0MRI(Matz的Ruby实现)。问题是我想在服务器运行时为简单的服务器命令提供I/O。这是从示例中获取的服务器。我添加了使用gets()获取输入的命令方法。我希望此方法在后台作为线程运行,但该线程正在阻塞另一个线程。require'socket'#Getsocketsfromstdlibserver=TCPServer.open(2000)#Sockettolistenonport2000defcommandsx=1whilex==1exitProgram=gets.chomp

  9. ruby - Arrays Sets 和 SortedSets 在 Ruby 中是如何实现的 - 2

    通常,数组被实现为内存块,集合被实现为HashMap,有序集合被实现为跳跃列表。在Ruby中也是如此吗?我正在尝试从性能和内存占用方面评估Ruby中不同容器的使用情况 最佳答案 数组是Ruby核心库的一部分。每个Ruby实现都有自己的数组实现。Ruby语言规范只规定了Ruby数组的行为,并没有规定任何特定的实现策略。它甚至没有指定任何会强制或至少建议特定实现策略的性能约束。然而,大多数Rubyist对数组的性能特征有一些期望,这会迫使不符合它们的实现变得默默无闻,因为实际上没有人会使用它:插入、前置或追加以及删除元素的最坏情况步骤复

  10. ruby - "public/protected/private"方法是如何实现的,我该如何模拟它? - 2

    在ruby中,你可以这样做:classThingpublicdeff1puts"f1"endprivatedeff2puts"f2"endpublicdeff3puts"f3"endprivatedeff4puts"f4"endend现在f1和f3是公共(public)的,f2和f4是私有(private)的。内部发生了什么,允许您调用一个类方法,然后更改方法定义?我怎样才能实现相同的功能(表面上是创建我自己的java之类的注释)例如...classThingfundeff1puts"hey"endnotfundeff2puts"hey"endendfun和notfun将更改以下函数定

随机推荐