草庐IT

[源码阅读]-Redis核心事件流程

xy1997 2023-03-28 原文

Redis核心流程

Notion版本

1.Redis核心流程中的重要数据结构

struct redisServer {
    int port; 
    int fd; 
    redisDb *db;
    aeEventLoop *el;
    list *clients;
    ...
};
typedef struct redisClient {
    int fd;
    redisDb *db;
    sds querybuf;
    list *reply;
} redisClient;
typedef struct redisDb {
    dict *dict;
    dict *expires;
    int id;
} redisDb;
typedef struct redisObject {
    void *ptr;
    int type;
    int refcount;
} robj;
typedef struct aeEventLoop {
    long long timeEventNextId;
    aeFileEvent *fileEventHead;
    aeTimeEvent *timeEventHead;
    int stop;
} aeEventLoop;

基于以上数据结构,Redis就可以构建核心的Server与client的交互流程

2.Redis核心流程解析

不像一些科学计算的程序,运行一次就可以产出所有结果。Redis功能是通过不断地对外界事件进行响应实现的。这种类型的程序,一般都存在事件循环线程,不断地响应,并处理外界事件。

int main(int argc, char **argv) {
    initServerConfig();
		...
    initServer();
    if (server.daemonize) daemonize();
    redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
		...
    if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
        acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
    redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
    aeMain(server.el);
    aeDeleteEventLoop(server.el);
    return 0;
}
// ...表示略去的不重要的代码

在main函数入口中,主要做了三件事:

  • 初始化服务器配置,并初始化服务器
  • 使用aeCreateFileEvent函数在服务器结构的事件循环el中注册了事件,事件的回调函数为acceptHandler。这个函数会在server.fd就绪的时候被调用,进行accept行为,建立对应的redisClient资源。(具体细节会在后面进行介绍)
  • 开启aeMain()事件循环,处理事件。在redis-1.0的版本中,使用select来对网络I/O行为进行多路复用,当有就绪事件时,就调用事先在事件中注册的回调函数进行处理。

我们来看看acceptHandler函数中的逻辑:

static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
		...
    cfd = anetAccept(server.neterr, fd, cip, &cport);
		...
    if ((c = createClient(cfd)) == NULL) {
		...
    }
		...
    server.stat_numconnections++;
}

acceptHandler函数主要为客户端连接建立了一个redisClient对象,用来管理后续的请求与响应。那么createClient是如何创建redisClient的呢?

下面是createClient函数的源码:

static redisClient *createClient(int fd) {
    redisClient *c = zmalloc(sizeof(*c));

    anetNonBlock(NULL,fd);
    anetTcpNoDelay(NULL,fd);
    if (!c) return NULL;
    selectDb(c,0);
    c->fd = fd;
    c->querybuf = sdsempty();
    c->argc = 0;
    c->argv = NULL;
    c->bulklen = -1;
    c->sentlen = 0;
    c->flags = 0;
    c->lastinteraction = time(NULL);
    c->authenticated = 0;
    c->replstate = REDIS_REPL_NONE;
    if ((c->reply = listCreate()) == NULL) oom("listCreate");
    listSetFreeMethod(c->reply,decrRefCount);
    listSetDupMethod(c->reply,dupClientReplyValue);
    if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
        readQueryFromClient, c, NULL) == AE_ERR) {
        freeClient(c);
        return NULL;
    }
    if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
    return c;
}

在createClient中,首先会设置网络的I/O类型,以及一些TCP参数(说明连接是以TCP的方式进行的)。而后,会选择与客户端相连的数据库,并对redisClient中的一些属性进行初始化。最后会使用redisClient的句柄fd创建一个新的ae可读事件,在这个事件中,使用readQueryFromClient作为回调函数。最后,新建的这个redisClient会被添加到redisServer的clients链表中。

readQueryFromClient做了什么事呢?

static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    redisClient *c = (redisClient*) privdata;
    char buf[REDIS_IOBUF_LEN];
    int nread;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);

    nread = read(fd, buf, REDIS_IOBUF_LEN);
		...

again:
    if (c->bulklen == -1) {
        /* Read the first line of the query */
        char *p = strchr(c->querybuf,'\n');
        size_t querylen;

        if (p) {
						...
            if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
            return;
        } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
            redisLog(REDIS_DEBUG, "Client protocol error");
            freeClient(c);
            return;
        }
    }
		...
}

在readQueryFromClient函数中,会执行来自客户端buffer中的命令。并且,该事件的生命周期与响应的redisClient生命周期是一致的(被freeClient函数所关闭),当遇到错误或客户端请求关闭时,才会注销该事件。其余情况,会在每次调用之后,仍存在于事件循环el中。我们可以看到,真正用来执行命令的是processCommand函数

在processCommand函数中,有以下逻辑:

static int processCommand(redisClient *c) {
    struct redisCommand *cmd;
		...
    cmd = lookupCommand(c->argv[0]->ptr);
		...
    /* Exec the command */
    dirty = server.dirty;
    cmd->proc(c);
    if (server.dirty-dirty != 0 && listLength(server.slaves))
        replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
    if (listLength(server.monitors))
        replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
    server.stat_numcommands++;

    /* Prepare the client for the next command */
    if (c->flags & REDIS_CLOSE) {
        freeClient(c);
        return 0;
    }
    resetClient(c);
    return 1;
}

这一部分包含一个查找命令的逻辑以及执行命令的逻辑。所有的命令都被放在cmdTable[]中。

我们以get命令为例,看看get命令的proc函数getCommand都做了什么事情:

static void getCommand(redisClient *c) {
    robj *o = lookupKeyRead(c->db,c->argv[1]);

    if (o == NULL) {
        addReply(c,shared.nullbulk);
    } else {
        if (o->type != REDIS_STRING) {
            addReply(c,shared.wrongtypeerr);
        } else {
            addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
            addReply(c,o);
            addReply(c,shared.crlf);
        }
    }
}

先从db中查找对应的redisObject,并且通过addReply方法加入到响应中。那么addReply做了什么事情呢?

static void addReply(redisClient *c, robj *obj) {
    if (listLength(c->reply) == 0 &&
        (c->replstate == REDIS_REPL_NONE ||
         c->replstate == REDIS_REPL_ONLINE) &&
        aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
        sendReplyToClient, c, NULL) == AE_ERR) return;
    if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
    incrRefCount(obj);
}

可以看到,在addReply中生成了一个ae可写事件,并注册了名为sendReplyToClient的回调函数。

sendReplyToClient是redisServer解析query,并响应query结果链路中的最后一环:

static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    redisClient *c = privdata;
    int nwritten = 0, totwritten = 0, objlen;
    robj *o;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);

    if (server.glueoutputbuf && listLength(c->reply) > 1)
        glueReplyBuffersIfNeeded(c);
    while(listLength(c->reply)) {
		    ...
        nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
				...
    }
    if (nwritten == -1) {
        if (errno == EAGAIN) {
            nwritten = 0;
        } else {
            redisLog(REDIS_DEBUG,
                "Error writing to client: %s", strerror(errno));
            freeClient(c);
            return;
        }
    }
    if (totwritten > 0) c->lastinteraction = time(NULL);
    if (listLength(c->reply) == 0) {
        c->sentlen = 0;
        aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
    }
}

这个函数负责不停的将redisClient.reply中的响应写到客户端。直到没有reply可写之后,调用aeDeleteFileEvent来删除这个事件。(这个事件的生命周期随着一次调用响应结束而结束)。3

3. 总结

看到这里,大家可以发现,redis的核心ae循环处理流程中,始终只有一个主线程在运行。通过事件驱动的方式,处理相应的事件。同时,使用select多路复用来快速响应I/O事件,达到了很高的效率。

参考

  1. redis-1.0 SOURCE-CODE
  2. <Go手写Redis>系列

有关[源码阅读]-Redis核心事件流程的更多相关文章

  1. ruby - 寻找通过阅读代码确定编程语言的ruby gem? - 2

    几个月前,我读了一篇关于ruby​​gem的博客文章,它可以通过阅读代码本身来确定编程语言。对于我的生活,我不记得博客或gem的名称。谷歌搜索“ruby编程语言猜测”及其变体也无济于事。有人碰巧知道相关gem的名称吗? 最佳答案 是这个吗:http://github.com/chrislo/sourceclassifier/tree/master 关于ruby-寻找通过阅读代码确定编程语言的rubygem?,我们在StackOverflow上找到一个类似的问题:

  2. ruby-on-rails - 事件管理员日期过滤器日期格式自定义 - 2

    是否有简单的方法来更改默认ISO格式(yyyy-mm-dd)的ActiveAdmin日期过滤器显示格式? 最佳答案 您可以像这样为日期选择器提供额外的选项,而不是覆盖js:=f.input:my_date,as::datepicker,datepicker_options:{dateFormat:"mm/dd/yy"} 关于ruby-on-rails-事件管理员日期过滤器日期格式自定义,我们在StackOverflow上找到一个类似的问题: https://s

  3. UE4 源码阅读:从引擎启动到Receive Begin Play - 2

    一、引擎主循环UE版本:4.27一、引擎主循环的位置:Launch.cpp:GuardedMain函数二、、GuardedMain函数执行逻辑:1、EnginePreInit:加载大多数模块int32ErrorLevel=EnginePreInit(CmdLine);PreInit模块加载顺序:模块加载过程:(1)注册模块中定义的UObject,同时为每个类构造一个类默认对象(CDO,记录类的默认状态,作为模板用于子类实例创建)(2)调用模块的StartUpModule方法2、FEngineLoop::Init()1、检查Engine的配置文件找出使用了哪一个GameEngine类(UGame

  4. ruby-on-rails - 事件记录 : Select max of limit - 2

    我正在尝试将以下SQL查询转换为ActiveRecord,它正在融化我的大脑。deletefromtablewhereid有什么想法吗?我想做的是限制表中的行数。所以,我想删除少于最近10个条目的所有内容。编辑:通过结合以下几个答案找到了解决方案。Temperature.where('id这给我留下了最新的10个条目。 最佳答案 从您的SQL来看,您似乎想要从表中删除前10条记录。我相信到目前为止的大多数答案都会如此。这里有两个额外的选择:基于MurifoX的版本:Table.where(:id=>Table.order(:id).

  5. ruby-on-rails - 事件管理员和自定义方法 - 2

    这是我在ActiveAdmin中的自定义页面ActiveAdmin.register_page"Settings"doaction_itemdolink_to('Importprojects','settings/importprojects')endcontentdopara"Text"endcontrollerdodefimportprojectssystem"rakedataspider:import_projects_ninja"para"OK"endendend我想做的是,当我单击“导入项目”按钮时,我想在Controller中执行rake任务。但是我无法访问该方法。可能是什

  6. ruby-on-rails - 在不重新查询数据库的情况下重新排序 Rails 中的事件记录? - 2

    例如,假设我有一个名为Products的模型,并且在ProductsController中,我有以下代码用于product_listView以显示已排序的产品。@products=Product.order(params[:order_by])让我们想象一下,在product_listView中,用户可以使用下拉菜单按价格、评级、重量等进行排序。数据库中的产品不会经常更改。我很难理解的是,每次用户选择新的order_by过滤器时,rails是否必须查询,或者rails是否能够以某种方式缓存事件记录以在服务器端重新排序?有没有一种方法可以编写它,以便在用户排序时rails不会重新查询结果

  7. ruby-on-rails - Ruby 长时间运行的进程对队列事件使用react - 2

    我有一个将某些事件写入队列的Rails3应用。现在我想在服务器上创建一个服务,每x秒轮询一次队列,并按计划执行其他任务。除了创建ruby​​脚本并通过cron作业运行它之外,还有其他稳定的替代方案吗? 最佳答案 尽管启动基于Rails的持久任务是一种选择,但您可能希望查看更有序的系统,例如delayed_job或Starling管理您的工作量。我建议不要在cron中运行某些东西,因为启动整个Rails堆栈的开销可能很大。每隔几秒运行一次它是不切实际的,因为Rails上的启动时间通常为5-15秒,具体取决于您的硬件。不过,每天这样做几

  8. ruby-on-rails - 使用 Rails 事件记录获取二级模型 - 2

    我有一个帖子属于城市的关系,城市又属于一个州,例如:classPost现在我想找到所有帖子及其所属的城市和州。我编写了以下查询来获取带有城市的帖子,但不知道如何在同一查找器中获取带有城市的相应州:@post=Post.find:all,:include=>[:city]感谢任何帮助。谢谢。 最佳答案 Post.all(:include=>{:city=>:state}) 关于ruby-on-rails-使用Rails事件记录获取二级模型,我们在StackOverflow上找到一个类似的问

  9. ruby-on-rails - 在 irb 中阅读文档 - 2

    我怀念ipython的一件事是它有一个?为特定功能挖掘文档的运算符。我知道ruby​​有一个类似的命令行工具,但是我在irb中调用它非常不方便。ruby/irb有类似的东西吗? 最佳答案 Pry是IPython的Ruby版本,它支持?命令来查找有关方法的文档,但语法略有不同:pry(main)>?File.dirnameFrom:file.cinRubyCore(CMethod):Numberoflines:6visibility:publicsignature:dirname()Returnsallcomponentsofthef

  10. ruby - 在没有数据库的情况下伪造一个事件记录模型 - 2

    我觉得我错过了什么。我正在编写一个ruby​​gem,它允许与事件记录进行交互,作为其主要功能的附加功能。在为其编写测试用例时,我需要能够指定虚拟事件记录模型来测试此功能。如果我可以获得一个事件记录模型的实例,它不需要与数据库的任何连接,可以有关系,所有这些东西,但不需要我在数据库中设置表,那就太棒了。我对测试还很陌生,在Rails测试之外我也很陌生,但似乎我应该能够相当轻松地完成类似的事情,但我什么也没找到。谁能告诉我我错过了什么?我看过工厂、制造商、固定装置,所有这些似乎都想达到目标。人们如何在您只需要AR对象进行测试的地方测试gem? 最佳答案

随机推荐