草庐IT

【图解源码】Zookeeper3.7源码剖析,Session的管理机制,Leader选举投票规则,集群数据同步流程

博学谷狂野架构师 2023-03-28 原文

Zookeeper3.7源码剖析

能力目标

  • 掌握Zookeeper中Session的管理机制
  • 能基于Client进行Debug测试Session创建/刷新操作
  • 能搭建Zookeeper集群源码配置
  • 掌握集群环境下Leader选举启动过程
  • 能说出Zookeeper选举过程中的概念
  • 能说出Zookeeper选举投票规则
  • 能画出Zookeeper集群数据同步流程

1 Session源码分析

客户端创建Socket连接后,会尝试连接,如果连接成功成功会调用到primeConnection方法用来发送ConnectRequest连接请求,这里便是设置session会话 ,关于客户端创建会话我们就不在这里做讲解了,我们直接讲解服务端Session会话处理流程。

1.1 服务端Session属性分析

Zookeeper服务端会话操作如下图:

在服务端通过SessionTrackerImplExpiryQueue来保存Session会话信息。

SessionTrackerImpl有以下属性:

1:sessionsById 用来存储ConcurrentHashMap<Long, SessionImpl> {sessionId:SessionImpl} 2:sessionExpiryQueue ExpiryQueue<SessionImpl>失效队列
3:sessionsWithTimeout ConcurrentMap<Long, Integer>存储的是{sessionId: sessionTimeout} 
4:nextSessionId 下一个sessionId

ExpiryQueue失效队列有以下属性:

1:elemMap ConcurrentHashMap<E, Long> 存储的是{SessionImpl: newExpiryTime} Session实例对象,失效时间。
2:expiryMap ConcurrentHashMap<Long, Set<E>>存储的是{time: set<SessionImp>} 失效时间,当前失效时间的Session对象集合。
3:nextExpirationTime 下一次失效时间 {(System.nanoTime() / 1000000)/expirationInterval+1}*expirationInterval 当前系统时间毫秒值ms=System.nanoTime() / 1000000。 nextExpirationTime=当前系统时间毫秒值+expirationInterval(失效间隔)。
4:expirationInterval 失效间隔,默认是10s,可以通过sessionlessCnxnTimeout修改。即是通过配置文件的tickTime修改。

1.2 Session创建

我们接着上一章的案例继续分析,假如客户端发起请求后,后端如何识别是第一次创建请求?在之前的案例源码NIOServerCnxn.readPayload()中有所体现,NIOServerCnxn.readPayload()部分关键源码如下:

此时如果initialized=false,表示第一次连接 需要创建Session(createSession),此处调用readConnectRequest()后,在readConnectRequest()方法中会将initialized设置为true,只有在处理完连接请求之后才会把initialized设置为true,才可以处理客户端其他命令。

上面方法还调用了processConnectRequest处理连接请求, processConnectRequest 第一次从请求中获取的sessionId=0,此时会把创建Session作为一个业务,会调用createSession()方法,processConnectRequest 方法部分关键代码如下:

创建会话调用createSession(),该方法会首先创建一个sessionId,并把该sessionId作为会话ID创建一个创建session会话的请求,并将该请求交给业务链作为一个业务处理,createSession()源码如下:

上面方法用到的sessionTracker.createSession(timeout)做了2个操作分别是创建sessionId和配置sessionId的跟踪信息,方法源码如下:

会话信息的跟踪其实就是将会话信息添加到队列中,任何地方可以根据会话ID找到会话信息,trackSession方法实现了Session创建、Session队列存储、Session过期队列存储,trackSession方法源码如下:

PrepRequestProcessorrun方法中调用pRequest2Txn,关键代码如下:

SyncRequestProcessor对txn(创建session的操作)进行持久化,在FinalRequestProcessor会对Session进行提交,其实就是把Session的ID和Timeout存到sessionsWithTimeout中去。

由于FinalRequestProcessor中调用链路太复杂,我们把调用链路写出来,大家可以按照这个顺序跟踪:

1:FinalRequestProcessor.applyRequest()
		方法代码:ProcessTxnResult rc = zks.processTxn(request);
		
2:ZooKeeperServer.processTxn(org.apache.zookeeper.server.Request)
		方法代码:processTxnForSessionEvents(request, hdr, request.getTxn());

上面调用链路中processTxnForSessionEvents(request, hdr, request.getTxn());方法代码如下:

上面方法主要处理了OpCode.createSession并且将sessionId、TimeOut提交到sessionsWithTimeout中,而提交到sessionsWithTimeout的方法SessionTrackerImpl.commitSession()代码如下:

1.3 Session刷新

服务端无论接受什么请求命令(增删或ping等请求)都会更新Session的过期时间 。我们做增删或者ping命令的时候,都会经过RequestThrottlerRequestThrottler的run方法中调用zks.submitRequestNow(),而zks.submitRequestNow(request)中调用了touch(si.cnxn);,该方法源码如下:

touchSession()方法更新sessionExpiryQueue失效队列中的失效时间,源码如下:

update()方法会在当前时间的基础上增加timeout,并更新失效时间为newExpiryTime,关键源码如下:

1.4 Session过期

SessionTrackerImpl是一个线程类,继承了ZooKeeperCriticalThread,我们可以看它的run方法,它首先获取了下一个会话过期时间,并休眠等待会话过期时间到期,然后获取过期的客户端会话集合并循环关闭,源码如下:

上面方法中调用了sessionExpiryQueue.poll(),该方法代码主要是获取过期时间对应的客户端会话集合,源码如下:

上面的setSessionClosing()方法其实是把Session会话的isClosing状态设置为了true,方法源码如下:

而让客户端失效的方法expirer.expire(s);其实也是一个业务操作,主要调用了ZooKeeperServer.expire()方法,而该方法获取SessionId后,又创建了一个OpCode.closeSession的请求,并交给业务链处理,我们查看ZooKeeperServer.expire()方法源码如下:

PrepRequestProcessor.pRequest2Txn()方法中OpCode.closeSession操作里最后部分代理明确将会话Session的isClosing设置为了true,源码如下:

业务链处理对象FinalRequestProcessor.processRequest()方法调用了ZooKeeperServer.processTxn(),并且在processTxn()方法中执行了processTxnForSessionEvents,而processTxnForSessionEvents()方法正好移除了会话信息,方法源码如下:

移除会话的方法SessionTrackerImpl.removeSession()会移除会话ID以及过期会话对象,源码如下:

1.5 Zookeeper会话测试

为了让Zookeeper的会话理解更深刻,我们对会话流程做一个测试,首先测试会话创建,再测试会话刷新。

1)会话创建测试

我们打开NIOServerCnxn.readPayload()方法,跟踪首次创建会话,调试情况如下:

此时会建立远程连接并创建SessionID,我们调试到NIOServerCnxn.readConnectRequest()方法,此时建立链接,并且得到的sessionId=0。

当sessionId=0时,会执行Session创建,Session创建会调用SessionTrackerImpl.createSession()方法实现会话创建,并将会话存入跟踪队列,DEBUG测试如下:

会话创建代码如下:

跟踪测试后,控制台输出如下信息:

AcceptThread----------链接服务的IP:127.0.0.1
1:会话未连接,准备首次连接会话.....
2:建立远程连接......
2:第1次连接的sessionId=0
使用SessionTrackerImpl创建会话,并将会话加入跟踪队列中
3:sessionId=0,此时创建sessionId=72061099907219458

2)会话刷新测试

我们执行get /zookeeper指令,然后首先跟踪到RequestThrottler.run()方法,执行如下:

执行程序到达ZooKeeperServer.touch(),即将开始准备刷新会话了,我们测试效果如下:

调用SessionTrackerImpl.touchSession()的时候会先判断会话是否为空、会话是否已经关闭,如果都没有,才执行刷新会话操作,DEBUG跟踪如下:

刷新会话其实就是会话时间增加,增加会话时间DEBUG跟踪如下:

测试后效果如下:

a.当前请求并未过期,不需要删除,准备刷新会话
b.准备调用SessionTrackerImpl.touchSession()刷新会话
c.会话不为空,会话也未关闭,准备调用updateSessionExpiry()刷新会话
d.剩余过期时间:54572178,增加过期时间:30000,刷新会话后过期时间:54604000

2 Zookeeper集群启动流程

我们先搭建Zookeeper集群,再来分析选举算法。

2.1 Zookeeper集群配置

如上图:

1:创建zoo1.cfg、zoo2.cfg、zoo3.cfg
2:创建zkdata1、zkdata2、zkdata3
3:创建3个myid,值分别为1、2、3

配置3个启动类,如下图:

2.2 集群启动流程分析

如上图,上图是Zookeeper单机/集群启动流程,每个细节所做的事情都在上图有说明,我们接下来按照流程图对源码进行分析。

程序启动,运行流程启动集群模式,如下图:

quorumPeer.start()启动服务,如下代码:

quorumPeer.start()方法代码如下:

quorumPeer.start()方法启动的主要步骤:

1:loadDataBase()加载数据。
2:startServerCnxnFactory 用来开启acceptThread、SelectorThread和workerPool线程池。
3:开启Leader选举startLeaderElection。
4:开启JVM监控线程startJvmPauseMonitor。
5:调用父类super.start();进行Leader选举。

startLeaderElection()开启Leader选举方法做了2件事,首先创建初始化选票选自己,接着创建选举投票方式,源码如下:

createElectionAlgorithm()创建选举算法只有第3种,其他2种均已废弃,方法源码如下:

这个方法创建了以下三个对象:

①、创建QuorumCnxManager对象

②、QuorumCnxManager.Listener

③、FastLeaderElection

3 Zookeeper集群Leader选举

3.1 Paxos算法介绍

Zookeeper选举主要依赖于FastLeaderElection算法,其他算法均已淘汰,但FastLeaderElection算法又是典型的Paxos算法,所以我们要先学习下Paxos算法,这样更有助于掌握FastLeaderElection算法。

1)Paxos介绍

分布式事务中常见的事务模型有2PC和3PC,无论是2PC提交还是3PC提交都无法彻底解决分布式的一致性问题以及无法解决太过保守及容错性不好。Google Chubby的作者Mike Burrows说过,世上只有一种一致性算法,那就是Paxos,所有其他一致性算法都是Paxos算法的不完整版。Paxos算法是公认的晦涩,很难讲清楚,但是工程上也很难实现,所以有很多Paxos算法的工程实现,如Chubby, Raft,ZAB,微信的PhxPaxos等。这一篇会介绍这个公认为难于理解但是行之有效的Paxos算法。Paxos算法是莱斯利·兰伯特(Leslie Lamport)1990年提出的一种基于消息传递的一致性算法,它曾就此发表了《The Part-Time Parliament》,《Paxos Made Simple》,由于采用故事的方式来解释此算法,感觉还是很难理解。

2)Paxos算法背景
Paxos算法是基于消息传递且具有高度容错特性的一致性算法,是目前公认的解决分布式一致性问题最有效的算法之一,其解决的问题就是在分布式系统中如何就某个值(决议)达成一致。
面试的时候:不要把这个Paxos算法达到的目的和分布式事务联系起来,而是针对Zookeeper这样的master-slave集群对某个决议达成一致,也就是副本之间写或者leader选举达成一致。我觉得这个算法和狭义的分布式事务不是一样的。
在常见的分布式系统中,总会发生诸如机器宕机或网络异常(包括消息的延迟、丢失、重复、乱序,还有网络分区)(也就是会发生异常的分布式系统)等情况。Paxos算法需要解决的问题就是如何在一个可能发生上述异常的分布式系统中,快速且正确地在集群内部对某个数据的值达成一致。也可以理解成分布式系统中达成状态的一致性。

3)Paxos算法理解

Paxos 算法是分布式一致性算法用来解决一个分布式系统如何就某个值(决议)达成一致的问题。一个典型的场景是,在一个分布式数据库系统中,如果各节点的初始状态一致,每个节点都执行相同的操作序列,那么他们最后能得到一个一致的状态。为保证每个节点执行相同的命令序列,需要在每一条指令上执行一个”一致性算法”以保证每个节点看到的指令一致。
分布式系统中一般是通过多副本来保证可靠性,而多个副本之间会存在数据不一致的情况。所以必须有一个一致性算法来保证数据的一致,描述如下:
  假如在分布式系统中初始是各个节点的数据是一致的,每个节点都顺序执行系列操作,然后每个节点最终的数据还是一致的。
  Paxos算法就是解决这种分布式场景中的一致性问题。对于一般的开发人员来说,只需要知道paxos是一个分布式选举算法即可。多个节点之间存在两种通讯模型:共享内存(Shared memory)、消息传递(Messages passing),Paxos是基于消息传递的通讯模型的。

4)Paxos相关概念

在Paxos算法中,有三种角色:

  • Proposer
  • Acceptor
  • Learners

在具体的实现中,一个进程可能同时充当多种角色。比如一个进程可能既是Proposer又是Acceptor又是Learner。Proposer负责提出提案,Acceptor负责对提案作出裁决(accept与否),learner负责学习提案结果。
还有一个很重要的概念叫提案(Proposal)。最终要达成一致的value就在提案里。只要Proposer发的提案被Acceptor接受(半数以上的Acceptor同意才行),Proposer就认为该提案里的value被选定了。Acceptor告诉Learner哪个value被选定,Learner就认为那个value被选定。只要Acceptor接受了某个提案,Acceptor就认为该提案里的value被选定了。
为了避免单点故障,会有一个Acceptor集合,Proposer向Acceptor集合发送提案,Acceptor集合中的每个成员都有可能同意该提案且每个Acceptor只能批准一个提案,只有当一半以上的成员同意了一个提案,就认为该提案被选定了。

3.2 QuorumPeer工作流程

QuorumCnxManager:每台服务器在启动的过程中,会启动一个QuorumPeer,负责各台服务器之间的底层Leader选举过程中的网络通信对应的类就是QuorumCnxManager

Zookeeper对于每个节点QuorumPeer的设计相当的灵活,QuorumPeer主要包括四个组件:客户端请求接收器(ServerCnxnFactory)、数据引擎(ZKDatabase)、选举器(Election)、核心功能组件(Leader/Follower/Observer)。

1:ServerCnxnFactory负责维护与客户端的连接(接收客户端的请求并发送相应的响应);(1001行)
2:ZKDatabase负责存储/加载/查找数据(基于目录树结构的KV+操作日志+客户端Session);(129行)
3:Election负责选举集群的一个Leader节点;(998行)
4:Leader/Follower/Observer确认是QuorumPeer节点应该完成的核心职责;(1270行)

QuorumPeer工作流程比较复杂,如下图:

QuorumPeer工作流程:

1:初始化配置
2:加载当前存在的数据
3:启动网络通信组件
4:启动控制台
5:开启选举协调者,并执行选举(这个过程是会持续,并不是一次操作就结束了)

3.3 QuorumCnxManager源码分析

QuorumCnxManager内部维护了一系列的队列,用来保存接收到的、待发送的消息以及消息的发送器,除接收队列以外,其他队列都按照SID分组形成队列集合,如一个集群中除了自身还有3台机器,那么就会为这3台机器分别创建一个发送队列,互不干扰。

QuorumCnxManager.Listener :为了能够相互投票,Zookeeper集群中的所有机器都需要建立起网络连接。QuorumCnxManager在启动时会创建一个ServerSocket来监听Leader选举的通信端口。开启监听后,Zookeeper能够不断地接收到来自其他服务器地创建连接请求,在接收到其他服务器地TCP连接请求时,会进行处理。为了避免两台机器之间重复地创建TCP连接,Zookeeper只允许SID大的服务器主动和其他机器建立连接,否则断开连接。在接收到创建连接请求后,服务器通过对比自己和远程服务器的SID值来判断是否接收连接请求,如果当前服务器发现自己的SID更大,那么会断开当前连接,然后自己主动和远程服务器将连接(自己作为“客户端”)。一旦连接建立,就会根据远程服务器的SID来创建相应的消息发送器SendWorker和消息发送器RecvWorker,并启动。

QuorumCnxManager.Listener监听启动可以查看QuorumCnxManager.Listenerrun方法,源代码如下,可以断点调试看到此时监听的正是我们所说的投票端口:

上面是监听器,各个服务之间进行通信我们需要开启ListenerHandler线程,在QuorumCnxManager.Listener.ListenerHandler的run方法中有一个方法acceptConnections() 调用,该方法就是用于接受每次选举投票的信息,如果只有一个节点或者没有投票信息的时候,此时方法会阻塞,一旦执行选举,程序会往下执行,我们可以先启动1台服务,再启动第2台、第3台,此时会收到有客户端参与投票链接,程序会往下执行,源码如下:

我们启动2台服务,效果如下:

上面虽然能证明投票访问了当前监听的端口,但怎么知道是哪台服务呢?我们可以沿着receiveConnection()源码继续研究,源码如下:

receiveConnection()方法只是获取了数据流,并没做特殊处理,并且调用了handleConnection()方法,该方法源码如下:

通过网络连接获取数据sid,获取sid表示是哪一台连过来的,我们可以打印输出sid,测试输出如下数据:

参与投票的MyID=2
参与投票的MyID=3

3.4 FastLeaderElection算法源码分析

Zookeeper集群中,主要分为三者角色,而每一个节点同时只能扮演一种角色,这三种角色分别是:

(1)Leader 接受所有Follower的提案请求并统一协调发起提案的投票,负责与所有的Follower进行内部的数据交换(同步);

(2)Follower 直接为客户端提供服务并参与提案的投票,同时与Leader进行数据交换(同步);

(3)Observer 直接为客户端服务但并不参与提案的投票,同时也与Leader进行数据交换(同步);

FastLeaderElection 选举算法是标准的 Fast Paxos 算法实现,可解决 LeaderElection 选举算法收敛速度慢的问题。

创建FastLeaderElection 只需要new FastLeaderElection()即可,如下代码:

创建FastLeaderElection会调用starter()方法,该方法会创建sendqueuerecvqueue队列、Messenger对象,其中Messenger对象的作用非常关键,方法源码如下:

创建Messenger的时候,会创建WorkerSender并封装成wsThread线程,创建WorkerReceiver并封装成wrThread线程,看名字就很容易理解,wsThread用于发送数据,wrThread用于接收数据,Messenger创建源码如下:

创建完FastLeaderElection后接着会调用它的start()方法启动选举算法,代码如下:

启动选举算法会调用start()方法,start()方法如下:

public void start() {
    this.messenger.start();
}

上面会执行messager.start(),也就是如下方法,也就意味着wsThreadwrThread线程都将启动,源码如下:

void start() {
	this.wsThread.start();
	this.wrThread.start();
}

wsThreadWorkerSender封装而来,此时会调用WorkerSenderrun方法,run方法会调用process()方法,源码如下:

process方法调用了managertoSend方法,此时是把对应的sid作为了消息发送出去,这里其实是发送投票信息,源码如下:

void process(ToSend m) {
    ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch, m.configData);
    manager.toSend(m.sid, requestBuffer);
}

投票可以投自己,也可以投别人,如果是选票选自己,只需要把投票信息添加到recvQueue中即可,源码如下:

WorkerReceiver.run方法中会从recvQueue中获取Message,并把发送给其他服务的投票封装到sendqueue队列中,交给WorkerSender发送处理,源码如下:

3.5 Zookeeper选举投票剖析

选举是个很复杂的过程,要考虑很多场景,而且选举过程中有很多概念需要理解。

3.5.1 选举概念

1)ZK服务状态:

public enum ServerState {
    //代表没有当前集群中没有Leader,此时是投票选举状态
    LOOKING,  
    //代表已经是伴随者状态
    FOLLOWING,
    //代表已经是领导者状态
    LEADING,
    //代表已经是观察者状态(观察者不参与投票过程)
    OBSERVING
}

2)服务角色:

//Learner 是随从服务和观察者的统称
public enum LearnerType {
    //随从者角色
    PARTICIPANT,
    //观察者角色
    OBSERVER
}

3)投票消息广播:

public static class Notification {
    int version;
    
    //被推荐leader的ID
     long leader;
    
      //被推荐leader的zxid
      long zxid;
     
     //投票轮次
     long electionEpoch;
     
     //当前投票者的服务状态 (LOOKING)
     QuorumPeer.ServerState state;
     //当前投票者的ID
     long sid;
     //QuorumVerifier作为集群验证器,主要完成判断一组server在
     //已给定的配置的server列表中,是否能够构成集群
     QuorumVerifier qv;
     
     //被推荐leader的投票轮次
     long peerEpoch;
    
}

4)选票模型:

public class Vote {
    //投票版本号,作为一个标识 
    private final int version;
    //当前服务的ID
    private final long id;
    //当前服务事务ID
    private final long zxid;
    //当前服务投票的轮次
    private final long electionEpoch;
    //被推举服务器的投票轮次
    private final long peerEpoch;
    //当前服务器所处的状态
    private final ServerState state;

}

5)消息发送对象:

public static class ToSend {
    //支持的消息类型
    enum mType {
        crequest, //请求
        challenge, //确认
        notification,//通知
        ack //确认回执
    }
   
    ToSend(mType type, long leader, long zxid, long electionEpoch, ServerState state, long sid, long peerEpoch, byte[] configData) {

        this.leader = leader;
        this.zxid = zxid;
        this.electionEpoch = electionEpoch;
        this.state = state;
        this.sid = sid;
        this.peerEpoch = peerEpoch;
        this.configData = configData;
    }

    /*
     * Proposed leader in the case of notification
     * 被投票推举为leader的服务ID 
     */ long leader;

    /*
     * id contains the tag for acks, and zxid for notifications
     * 
     */ long zxid;

    /*
     * Epoch
     * 投票轮次
     */ long electionEpoch;

    /*
     * Current state;
     * 服务状态
     */ QuorumPeer.ServerState state;

    /*
     * Address of recipient
     * 消息接收方服务ID
     */ long sid;

    /*
     * Used to send a QuorumVerifier (configuration info)
     */ byte[] configData = dummyData;

    /*
     * Leader epoch
     */ long peerEpoch;

}

3.5.2 选举过程

QuorumPeer本身是个线程,在集群启动的时候会执行quorumPeer.start();,此时会调用它重写的start()方法,最后会调用父类的start()方法,所以该线程会启动执行,因此会执行它的run方法,而run方法正是选举流程的入口,我们看run方法关键源码如下:

所有节点初始状态都为LOOKING,会进入到选举流程,选举流程首先要获取算法,获取算法的方法是makeLEStrategy(),该方法返回的是FastLeaderElection实例,核心选举流程是FastLeaderElection中的lookForLeader()方法。

/****
 * 获取选举算法
 */
@SuppressWarnings("deprecation")
protected Election makeLEStrategy() {
    return electionAlg;
}

lookForLeader()是选举过程的关键流程,源码分析如下:

上面多个地方都用到了过半数以上的方法hasAllQuorums()该方法用到了QuorumMaj类,代码如下:

QuorumMaj构造函数中体现了过半数以上的操作,代码如下:

3.5.3 投票规则

我们来看一下选票PK的方法totalOrderPredicate(),该方法其实就是Leader选举规则,规则有如下三个:

1:比较 epoche(zxid高32bit),如果其他节点的epoche比自己的大,选举 epoch大的节点(理由:epoch 表示年代,epoch越大表示数据越新)代码:(newEpoch > curEpoch);

2:比较 zxid, 如果epoche相同,就比较两个节点的zxid的大小,选举 zxid大的节点(理由:zxid 表示节点所提交事务最大的id,zxid越大代表该节点的数据越完整)代码:(newEpoch == curEpoch) && (newZxid > curZxid);

3:比较 serviceId,如果 epoch和zxid都相等,就比较服务的serverId,选举 serviceId大的节点(理由: serviceId 表示机器性能,他是在配置zookeeper集群时确定的,所以我们配置zookeeper集群的时候可以把服务性能更高的集群的serverId设置大些,让性能好的机器担任leader角色)代码 :(newEpoch == curEpoch) && ((newZxid == curZxid) && (newId > curId))。

源码如下:

4 Zookeeper集群数据同步

所有事务操作都将由leader执行,并且会把数据同步到其他节点,比如follower、observer,我们可以分析leader和follower的操作行为即可分析出数据同步流程。

4.1 Zookeeper同步流程说明

整体流程:

1:当角色确立之后,leader调用leader.lead();方法运行,创建一个接收连接的LearnerCnxAcceptor线程,在LearnerCnxAcceptor线程内部又建立一个阻塞的LearnerCnxAcceptorHandler线程等待Learner端的连接。Learner端以follower为例,follower调用follower.followLeader();方法首先查找leader的Socket服务端,然后建立连接。当follower建立连接后,leader端会建立一个LearnerHandler线程相对应,用来处理follower与leader的数据包传输。 

2:follower端封装当前zk服务器的Zxid和Leader.FOLLOWERINFO的LearnerInfo数据包发送给leader

3:leader端这时处于getEpochToPropose方法的阻塞时期,需要得到Learner端超过一半的服务器发送Epoch

4:getEpochToPropose解阻塞之后,LearnerHandler线程会把超过一半的Epoch与leader比较得到最新的newLeaderZxid,并封装成Leader.LEADERINFO包发送给Learner端

5:Learner端得到最新的Epoch,会更新当前服务器的Epoch。并把当前服务器所处的lastLoggedZxid位置封装成Leader.ACKEPOCH发送给leader

6:此时leader端处于waitForEpochAck方法的阻塞时期,需要得到Learner端超过一半的服务器发送EpochACK

7:当waitForEpochAck阻塞之后便可以在LearnerHandler线程内决定用那种方式进行同步。如果Learner端的lastLoggedZxid>leader端的,Learner端将会被删除多余的部分。如果小于leader端的,将会以不同方式进行同步 

8:leader端发送Leader.NEWLEADER数据包给Learner端(6、7步骤都是另开一个线程来发送这些数据包)

9:Learner端同步之后,会在一个while循环内处理各种leader端发送数据包,包括两阶段提交的Leader.PROPOSAL、Leader.COMMIT、Leader.INFORM等。在同步数据后会处理Leader.NEWLEADER数据包,然后发送Leader.ACK给leader端 

10:此时leader端处于waitForNewLeaderAck阻塞等待超过一半节点发送ACK。

我们回到QuorumPeer.run()方法,根据确认的不同角色执行不同操作展开分析。

4.2 Zookeeper Follower同步流程

Follower主要连接Leader实现数据同步,我们看看Follower做的事,我们仍然沿着QuorumPeer.run()展开学习,关键代码如下:

创建Follower的方法比较简单,代码如下:

我们看一下整个Follower在数据同步中做的所有操作follower.followLeader();,源码如下图:

上面源码中的follower.followLeader()方法主要做了如下几件事:

1:寻找Leader
2:和Leader创建链接
3:向Leader注册Follower,会将当前Follower节点信息发送给Leader节点
4:和Leader同步历史数据
5:读取Leader发送的数据包
6:同步Leader数据包

我们对follower.followLeader()调用的其他方法进行剖析,其中findLeader()是寻找当前Leader节点的,源代码如下:

followLeader()中调用了registerWithLeader(Leader.FOLLOWERINFO);该方法是向Leader注册Follower,会将当前Follower节点信息发送给Leader节点,Follower节点信息发给Leader是必须的,是Leader同步数据个基础,源码如下:

followLeader()中最后读取数据包执行同步的方法中调用了readPacket(qp);,这个方法就是读取Leader的数据包的封装,源码如下:

4.3 Zookeeper Leader同步流程

我们查看QuorumPeer.run()方法的LEADING部分,可以看到先创建了Leader对象,并设置了Leader,然后调用了leader.lead()leader.lead()是执行的核心业务流程,源码如下:

leader.lead()方法是Leader执行的核心业务流程,源码如下:

leader.lead()方法会执行如下几个操作:

1:从快照和事务日志中加载数据
2:创建一个线程,接收Follower/Observer的连接
3:等待超过一半的(Follower和Observer)连接,再继续往下执行程序
4:等待超过一半的(Follower和Observer)获取了新的epoch,并且返回了Leader.ACKEPOCH,再继续往下执行程序
5:等待超过一半的(Follower和Observer)进行数据同步成功,并且返回了Leader.ACK,再继续往下执行程序
6:数据同步完成,开启zkServer,并且同时开启请求调用链接收请求执行
7:进行一个死循环,每次休眠self.tickTime / 2,和对所有的(Observer/Follower)发起心跳检测
8:集群中没有过半Follower在集群中,调用shutdown关闭一些对象,重新选举

lead()方法中会创建LearnerCnxAcceptor,该对象是一个线程,主要用于接收followers的连接,这里加了CountDownLatch根据配置的同步的地址的数量(例如:server.2=127.0.0.1:12881:13881 配置同步的端口是12881只有一个),LearnerCnxAcceptor的run方法源码如下:

LearnerCnxAcceptor的run方法中创建了LearnerCnxAcceptorHandler对象,在接收到链接后,就会调用LearnerCnxAcceptorHandler,而LearnerCnxAcceptorHandler是一个线程,它的run方法中调用了acceptConnections()方法,源码如下:

acceptConnections()方法会在这里阻塞接收followers的连接,当有连接过来会生成一个socket对象。然后根据当前socket生成一个LearnerHandler线程 ,每个Learner者都会开启一个LearnerHandler线程,方法源码如下:

LearnerHandler.run 这里就是读取或写数据包与Learner交换数据包。如果没有数据包读取,则会阻塞当前方法ia.readRecord(qp, "packet");,源码如下:

我们再回到leader.lead()方法,其中调用了getEpochToPropose()方法,该方法是判断connectingFollowers发给leader端的Epoch是否过半,如果过半则会解阻塞,不过半会一直阻塞着,直到Follower把自己的Epoch数据包发送过来并符合过半机制,源码如下:

lead()方法中,当发送的Epoch过半之后,把当前zxid设置到zk,并等待EpochAck,关键源码如下:

waitForEpochAck()方法也会等待超过一半的(Follower和Observer)获取了新的epoch,并且返回了Leader.ACKEPOCH,才会解除阻塞,否则会一直阻塞。等待EpochAck解阻塞后,把得到最新的epoch更新到当前服务,设置当前leader节点的zab状态是SYNCHRONIZATION,方法源码如下:

lead()方法中还需要等待超过一半的(Follower和Observer)进行数据同步成功,并且返回了Leader.ACK,程序才会解除阻塞,如下代码:

上面所有流程都走完之后,就证明数据已经同步成功了,会执行startZkServer();

4.4 LearnerHandler数据同步操作

LearnerHandler线程是对应于Learner连接Leader端后,建立的一个与Learner端交换数据的线程。每一个Learner端都会创建一个 LearnerHandler线程。

我们详细讲解LearnerHandler.run()方法。

readRecord读取数据包 不断从learner节点读数据,如果没读到将会阻塞readRecord

如果数据包类型不是Leader.FOLLOWERINFO或Leader.OBSERVERINFO将会返回,因为咱们这里本身就是Leader节点,读数据肯定是读非Leader节点数据。

获取learnerInfoData来获取sid和版本信息。

获取followerInfo和lastAcceptedEpoch,信息如下:

把Leader.NEWLEADER数据包放入到queuedPackets,并向其他节点发送,源码如下:

本文由传智教育博学谷 - 狂野架构师教研团队发布
如果本文对您有帮助,欢迎关注和点赞;如果您有任何建议也可留言评论或私信,您的支持是我坚持创作的动力
转载请注明出处!

有关【图解源码】Zookeeper3.7源码剖析,Session的管理机制,Leader选举投票规则,集群数据同步流程的更多相关文章

  1. UE4 源码阅读:从引擎启动到Receive Begin Play - 2

    一、引擎主循环UE版本:4.27一、引擎主循环的位置:Launch.cpp:GuardedMain函数二、、GuardedMain函数执行逻辑:1、EnginePreInit:加载大多数模块int32ErrorLevel=EnginePreInit(CmdLine);PreInit模块加载顺序:模块加载过程:(1)注册模块中定义的UObject,同时为每个类构造一个类默认对象(CDO,记录类的默认状态,作为模板用于子类实例创建)(2)调用模块的StartUpModule方法2、FEngineLoop::Init()1、检查Engine的配置文件找出使用了哪一个GameEngine类(UGame

  2. 使用canal同步MySQL数据到ES - 2

    文章目录一、概述简介原理模块二、配置Mysql使用版本环境要求1.操作系统2.mysql要求三、配置canal-server离线下载在线下载上传解压修改配置单机配置集群配置分库分表配置1.修改全局配置2.实例配置垂直分库水平分库3.修改group-instance.xml4.启动监听四、配置canal-adapter1修改启动配置2配置映射文件3启动ES数据同步查询所有订阅同步数据同步开关启动4.验证五、配置canal-admin一、概述简介canal是Alibaba旗下的一款开源项目,Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。Git地址:https://github.co

  3. ruby-on-rails - Rails 优雅地处理超时 session ? - 2

    使用rails4,ruby2。我在rails配置中为我的cookiesession设置了30分钟的超时时间。问题是,如果我转到表单,让session超时,然后提交表单,我会收到此ActionController::InvalidAuthenticityToken错误。如何在Rails中优雅地处理这个错误?比如说,重定向到登录屏幕? 最佳答案 在您的ApplicationController:rescue_fromActionController::InvalidAuthenticityTokendoredirect_tosome_p

  4. ruby-on-rails - 为什么在 Rails 5.1.1 中删除了 session 存储初始化程序 - 2

    我去了这个website查看Rails5.0.0和Rails5.1.1之间的区别为什么5.1.1不再包含:config/initializers/session_store.rb?谢谢 最佳答案 这是删除它的提交:Setupdefaultsessionstoreinternally,nolongerthroughanapplicationinitializer总而言之,新应用没有该初始化器,session存储默认设置为cookie存储。即与在该初始值设定项的生成版本中指定的值相同。 关于

  5. ruby - Sinatra session 未按预期持续 - 2

    我正在尝试使用Sinatra中的重定向和session在网站周围传递一些数据。这是一个简化的示例,使用PrettyPrint进行调试:require'pp'require'rubygems'require'sinatra'enable:sessionsget'/'dosession[:foo]='12345'puts'session1'ppsessionredirectto('/redir')endget'/redir'doputs'session2'ppsession'helloworld'end查看Thin的输出,我看到:>>Listeningon0.0.0.0:4567,CTRL

  6. elasticsearch源码关于TransportSearchAction【阶段三】 - 2

    1.回顾.TransportServicepublicclassTransportServiceextendsAbstractLifecycleComponentTransportService:方法:1publicfinalTextendsTransportResponse>voidsendRequest(finalTransport.Connectionconnection,finalStringaction,finalTransportRequestrequest,finalTransportRequestOptionsoptions,TransportResponseHandlerT>

  7. (附源码)vue3.0+.NET6实现聊天室(实时聊天SignalR) - 2

    参考文章搭建文章gitte源码在线体验可以注册两个号来测试演示图:一.整体介绍  介绍SignalR一种通讯模型Hub(中心模型,或者叫集线器模型),调用这个模型写好的方法,去发送消息。  内容有:    ①:Hub模型的方法介绍    ②:服务器端代码介绍    ③:前端vue3安装并调用后端方法    ④:聊天室样例整体流程:1、进入网站->调用连接SignalR的方法2、与好友发送消息->调用SignalR的自定义方法 前端通过,signalR内置方法.invoke()  去请求接口3、监听接受方法(渲染消息)通过new signalR.HubConnectionBuilder().on

  8. ruby - 如何强制 Rack :session + sinatra to read "rack.session" from params instead of cookies - 2

    我正在处理oauth1.0(twitter和flickr)。网站工作在80端口,oauth服务器工作在8080端口算法:向oauth服务器发送ajax请求以检查用户是否有有效的access_token如果用户没有access_token或access_token已过期,则打开授权窗口在oauth服务器的用户session中保存access_token发送分享数据到oauth服务器它使用sinatra+rack:session+rack::session::sequel+sqlite来存储session。它在每个响应中发送Set-Cookie:rack.session=id我正在使用2种

  9. ruby-on-rails - 如何编写 Rails 4 测试以使用 omniauth-google-oauth2 gem 创建 session ? - 2

    我正在尝试为使用omniauth-google-oauth2gem创建session编写测试。我是否需要将env["omniauth.auth"]变量与post:create一起传递?也许当我试图这样做时,我做错了。我得到的错误如下所示...Rake测试错误1)Error:SessionsControllerTest#test_should_get_create:NoMethodError:undefinedmethod`provider'fornil:NilClassapp/models/user.rb:6:in`from_omniauth'app/controllers/sessi

  10. ruby - session 未创建 : Chrome version must be between - 2

    当使用ruby​​selenium驱动程序驱动chrome时,我得到/home/travis/.rvm/gems/ruby-2.6.2/gems/selenium-webdriver-3.141.5926/lib/selenium/webdriver/remote/response.rb:72:in`assert_ok':sessionnot创建:Chrome版本必须在70和73之间(Selenium::WebDriver::Error::SessionNotCreatedError)如何解决这个问题?降级chrome不是我想做的事。 最佳答案

随机推荐