草庐IT

DataNode与NameNode交互机制相关代码分析

wangxucumt 2023-03-28 原文
    HDFS Federation是为解决HDFS单点故障而提出的NameNode水平扩展方案,该方案允许HDFS创建多个Namespace以提高集群的扩展性和隔离性。在Federation中新增了block-pool的概念,block-pool就是属于单个Namespace的一组block,每个DataNode为所有的block-pool存储block,可以理解block-pool是一个重新将block划分的逻辑概念,同一个DataNode中可以存储属于多个block-pool的多个block。所以在NameNode和DataNode通信相关的代码方面,也做了很大的改动以支持上述特性。

    在cdh3x中,DataNode与NameNode交互主要集中在DataNode这个类中,类结构比较简单,随着Federation概念的引入,新增了一些比较重要的类来管理逻辑层面划分的block-pool和block-pool下的block分布,并以block-pool为单位来与NameNode进行相关的通信。类图如下

        BPServiceActor类实现Runnable接口,以线程的方式运行,一个BPServiceActor实例可以和一个active或standby模式的NameNode实例进行交互,它是真正的任务执行者。主要有四大职能

    1.预先与NameNode进行握手

    2.向NameNode注册

    3.周期性的向NameNode发送心跳

    4.处理NameNode发送回的命令

    一个BPOfferService实例代表在某个DataNode上的某个block-pool(一个block-pool对应一个独立的Namespace),对block-pool对应的active和standby状态的NameNode进行交互的操作。BPOfferService管理和每个NameNode进行实际通信的BPServiceActor实例,并作为代理与处于active状态和standby状态的两个NameNode进行交互,同时标识与active状态NameNode通信的BPServiceActor实例。相关代码如下

class BPOfferService {   static final Log LOG = DataNode.LOG;   //本block-pool服务代表的Namespace信息,和NameNode握手的第一阶段分配所得   NamespaceInfo bpNSInfo;        //block-pool所在DataNode相关的注册信息,和NameNode握手的第二阶段分配所得   volatile DatanodeRegistration bpRegistration;      //所属datanode实例   private final DataNode dn;        //代表和当前active状态的NameNode关联的BPServiceActor实例   //如果所有NameNode处于standby状态,此属性可以为空   //如果此属性非空,则必指向bpServices集合中的某个实例   private BPServiceActor bpServiceToActive = null;      //在本nameservice服务下指向所有NameNode的BPServiceActor实例   //不论代表的NameNode是active状态还是standby状态   private List<BPServiceActor> bpServices = new CopyOnWriteArrayList<BPServiceActor>();        //构造方法中根据NameNode的地址来初始化BPServiceActor,并加入到bpServices集合中   BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {     Preconditions.checkArgument(!nnAddrs.isEmpty(),         "Must pass at least one NN.");     this.dn = dn;          for (InetSocketAddress addr : nnAddrs) {       this.bpServices.add(new BPServiceActor(addr, this));     }   } }    BlockPoolManager类主要用于管理DataNode上的BPOfferService,对BPOfferService对象的创建,删除,启动,停止,关闭的操作都需要通过BlockPoolManager提供的方法来控制。代码如下

class BlockPoolManager {   private static final Log LOG = DataNode.LOG;      //nameserviceId和BPOfferService的映射集合   private final Map<String, BPOfferService> bpByNameserviceId = Maps.newHashMap();   //blockPoolId和BPOfferService的映射集合   private final Map<String, BPOfferService> bpByBlockPoolId = Maps.newHashMap();   //BPOfferService集合   private final List<BPOfferService> offerServices = Lists.newArrayList();   //当前所属的datanode实例   private final DataNode dn;   //更新NameNode列表时的lock   private final Object refreshNamenodesLock = new Object();      BlockPoolManager(DataNode dn) {     this.dn = dn;   } }    BlockPoolSliceScanner类用于扫描block-pool下的block文件并校验文件是否损坏,它对block和最后的校验时间进行跟踪,目前不提供修改block元数据的操作。一个DataNode对应一个DataBlockScanner,DataBlockScanner对不同block-pool的BlockPoolSliceScanner进行管理。

    BlockPoolSliceStorage用于管理DataNode上对应同一个block pool的BlockPoolSlices集合,由于一个DataNode上可能会挂载多个存储设备,即逻辑上对应多个volume,一个BlockPoolSlice对应一个volume,所以对同一个DataNode上的同一个block pool,可以管理多个BlockPoolSlice。BlockPoolSliceStorage的主要职能如下

    1.对新生成的block-pool对应的存储进行格式化

    2.恢复存储状态以保持一致性

    3.在升级的时候对block-pool进行快照处理

    4.回滚block-pool到上一个快照

    5.删除快照并提交block

    在cdh3x中,DataNode启动过程中与NameNode交互的操作,都是在DataNode类中进行的,包括握手,注册,数据块上报和发送心跳等。代码调用关系如下图所示


握手


注册


数据块上报


发送心跳

      在cdh5.1中,这些操作最终都交给了BPServiceActor来处理,下面来详细分析下具体的代码和相互间的调用关系。

    BlockPoolManager在startDataNode方法中被实例化,startDataNode调用关系如下


DataNode.startDataNode(Configuration conf,                       List<StorageLocation> dataDirs,                      SecureResources resources                      ) throws IOException {     blockPoolManager = new BlockPoolManager(this);     //刷新加载NameNodes     blockPoolManager.refreshNamenodes(conf); } BlockPoolManager.refreshNamenodes(Configuration conf)       throws IOException {     LOG.info("Refresh request received for nameservices: "         + conf.get(DFSConfigKeys.DFS_NAMESERVICES));          //地址映射列表,Map<nameserviceId,<namenodeId,nnAddress>>     Map<String, Map<String, InetSocketAddress>> newAddressMap =        DFSUtil.getNNServiceRpcAddresses(conf);          synchronized (refreshNamenodesLock) {       doRefreshNamenodes(newAddressMap);     } } BlockPoolManager.doRefreshNamenodes(Map<String, Map<String, InetSocketAddress>> addrMap){     Set<String> toRefresh = Sets.newLinkedHashSet();     Set<String> toAdd = Sets.newLinkedHashSet();     Set<String> toRemove;          synchronized (this) {       for (String nameserviceId : addrMap.keySet()) {         if (bpByNameserviceId.containsKey(nameserviceId)) {           //已经存在,可能有更新的nameserviceId           toRefresh.add(nameserviceId);         } else {           //加入新的nameserviceId           toAdd.add(nameserviceId);         }       }              //找出bpByNameserviceId存在的,但不存在于addrMap的nameserviceId       //等待删除       toRemove = Sets.newHashSet(Sets.difference(           bpByNameserviceId.keySet(), addrMap.keySet()));              //启动新的nameservice       if (!toAdd.isEmpty()) {         for (String nsToAdd : toAdd) {           ArrayList<InetSocketAddress> addrs =             Lists.newArrayList(addrMap.get(nsToAdd).values());           //根据NameNode地址集合创建新的BPOfferService实例           BPOfferService bpos = createBPOS(addrs);           //建立nameserviceId到BPOfferService的映射           bpByNameserviceId.put(nsToAdd, bpos);           //加入到offerServices集合           offerServices.add(bpos);         }       }       //启动BPOfferService服务       startAll();     }          //删除toRemove中的nameserviceId的映射关系,并停止相关服务     if (!toRemove.isEmpty()) {       for (String nsToRemove : toRemove) {         BPOfferService bpos = bpByNameserviceId.get(nsToRemove);         bpos.stop();         bpos.join();       }     }          //刷新变化的nameserviceId     if (!toRefresh.isEmpty()) {       for (String nsToRefresh : toRefresh) {         BPOfferService bpos = bpByNameserviceId.get(nsToRefresh);         ArrayList<InetSocketAddress> addrs =           Lists.newArrayList(addrMap.get(nsToRefresh).values());         bpos.refreshNNList(addrs);       }     } } BlockPoolManager.startAll() throws IOException {     try {       UserGroupInformation.getLoginUser().doAs(           new PrivilegedExceptionAction<Object>() {             @Override             public Object run() throws Exception {               for (BPOfferService bpos : offerServices) {                 //启动BPOfferService服务                 bpos.start();               }               return null;             }           });     } catch (InterruptedException ex) {       IOException ioe = new IOException();       ioe.initCause(ex.getCause());       throw ioe;     } } BPOfferService.start() {     for (BPServiceActor actor : bpServices) {       //启动BPServiceActor服务       actor.start();     } }    经过层层调用之后,真正和NameNode进行通信的BPServiceActor服务被启动,启动后的BPServiceActor开始和它对应状态的NameNode进行握手注册等一系列操作,BPServiceActor服务对应的NameNode可能是active或standby状态。详细代码如下

BPServiceActor.run() {     LOG.info(this + " starting to offer service");     try {       while (true) {         try {           //连接到NameNode并进行握手           connectToNNAndHandshake();           break;         } catch (IOException ioe) {           runningState = RunningState.INIT_FAILED;           if (shouldRetryInit()) {             LOG.error("Initialization failed for " + this + " "                 + ioe.getLocalizedMessage());             sleepAndLogInterrupts(5000, "initializing");           } else {             runningState = RunningState.FAILED;             LOG.fatal("Initialization failed for " + this + ". Exiting. ", ioe);             return;           }         }       }       runningState = RunningState.RUNNING;       while (shouldRun()) {         try {           //循环调用offerService()           //在本方法中,周期性的向NameNode发送心跳并执行NameNode返回的相关命令           offerService();         } catch (Exception ex) {           LOG.error("Exception in BPOfferService for " + this, ex);           sleepAndLogInterrupts(5000, "offering service");         }       }       runningState = RunningState.EXITED;     } catch (Throwable ex) {       LOG.warn("Unexpected exception in block pool " + this, ex);       runningState = RunningState.FAILED;     } finally {       LOG.warn("Ending block pool service for: " + this);       cleanUp();     }   }        BPServiceActor.connectToNNAndHandshake() throws IOException {     //连接到NameNode并获得NameNode代理对象     bpNamenode = dn.connectToNN(nnAddr);     //第一阶段获取NamespaceInfo     NamespaceInfo nsInfo = retrieveNamespaceInfo();          //校验namespaceInfo是否和HA中的其他NameNode信息一致     //并建立blockPoolManager和BPOfferService的对应关系     bpos.verifyAndSetNamespaceInfo(nsInfo);          //第二阶段向NameNode注册     register();   }    上述主要分析了加入Federation特性和HA特性后,DataNode和NameNode在代码层面交互方式的改变,相比之前的代码,逻辑更加清晰并且类之间的耦合度更低。

有关DataNode与NameNode交互机制相关代码分析的更多相关文章

  1. ruby - 如何在 buildr 项目中使用 Ruby 代码? - 2

    如何在buildr项目中使用Ruby?我在很多不同的项目中使用过Ruby、JRuby、Java和Clojure。我目前正在使用我的标准Ruby开发一个模拟应用程序,我想尝试使用Clojure后端(我确实喜欢功能代码)以及JRubygui和测试套件。我还可以看到在未来的不同项目中使用Scala作为后端。我想我要为我的项目尝试一下buildr(http://buildr.apache.org/),但我注意到buildr似乎没有设置为在项目中使用JRuby代码本身!这看起来有点傻,因为该工具旨在统一通用的JVM语言并且是在ruby中构建的。除了将输出的jar包含在一个独特的、仅限ruby​​

  2. ruby-on-rails - Rails 源代码 : initialize hash in a weird way? - 2

    在rails源中:https://github.com/rails/rails/blob/master/activesupport/lib/active_support/lazy_load_hooks.rb可以看到以下内容@load_hooks=Hash.new{|h,k|h[k]=[]}在IRB中,它只是初始化一个空哈希。和做有什么区别@load_hooks=Hash.new 最佳答案 查看rubydocumentationforHashnew→new_hashclicktotogglesourcenew(obj)→new_has

  3. ruby-on-rails - 如何在 ruby​​ 交互式 shell 中有多行? - 2

    这可能是个愚蠢的问题。但是,我是一个新手......你怎么能在交互式ruby​​shell中有多行代码?好像你只能有一条长线。按回车键运行代码。无论如何我可以在不运行代码的情况下跳到下一行吗?再次抱歉,如果这是一个愚蠢的问题。谢谢。 最佳答案 这是一个例子:2.1.2:053>a=1=>12.1.2:054>b=2=>22.1.2:055>a+b=>32.1.2:056>ifa>b#Thecode‘if..."startsthedefinitionoftheconditionalstatement.2.1.2:057?>puts"f

  4. ruby-on-rails - 相关表上的范围为 "WHERE ... LIKE" - 2

    我正在尝试从Postgresql表(table1)中获取数据,该表由另一个相关表(property)的字段(table2)过滤。在纯SQL中,我会这样编写查询:SELECT*FROMtable1JOINtable2USING(table2_id)WHEREtable2.propertyLIKE'query%'这工作正常:scope:my_scope,->(query){includes(:table2).where("table2.property":query)}但我真正需要的是使用LIKE运算符进行过滤,而不是严格相等。然而,这是行不通的:scope:my_scope,->(que

  5. ruby-on-rails - 浏览 Ruby 源代码 - 2

    我的主要目标是能够完全理解我正在使用的库/gem。我尝试在Github上从头到尾阅读源代码,但这真的很难。我认为更有趣、更温和的踏脚石就是在使用时阅读每个库/gem方法的源代码。例如,我想知道RubyonRails中的redirect_to方法是如何工作的:如何查找redirect_to方法的源代码?我知道在pry中我可以执行类似show-methodmethod的操作,但我如何才能对Rails框架中的方法执行此操作?您对我如何更好地理解Gem及其API有什么建议吗?仅仅阅读源代码似乎真的很难,尤其是对于框架。谢谢! 最佳答案 Ru

  6. ruby - 模块嵌套代码风格偏好 - 2

    我的假设是moduleAmoduleBendend和moduleA::Bend是一样的。我能够从thisblog找到解决方案,thisSOthread和andthisSOthread.为什么以及什么时候应该更喜欢紧凑语法A::B而不是另一个,因为它显然有一个缺点?我有一种直觉,它可能与性能有关,因为在更多命名空间中查找常量需要更多计算。但是我无法通过对普通类进行基准测试来验证这一点。 最佳答案 这两种写作方法经常被混淆。首先要说的是,据我所知,没有可衡量的性能差异。(在下面的书面示例中不断查找)最明显的区别,可能也是最著名的,是你的

  7. ruby - 寻找通过阅读代码确定编程语言的ruby gem? - 2

    几个月前,我读了一篇关于ruby​​gem的博客文章,它可以通过阅读代码本身来确定编程语言。对于我的生活,我不记得博客或gem的名称。谷歌搜索“ruby编程语言猜测”及其变体也无济于事。有人碰巧知道相关gem的名称吗? 最佳答案 是这个吗:http://github.com/chrislo/sourceclassifier/tree/master 关于ruby-寻找通过阅读代码确定编程语言的rubygem?,我们在StackOverflow上找到一个类似的问题:

  8. ruby - Net::HTTP 获取源代码和状态 - 2

    我目前正在使用以下方法获取页面的源代码:Net::HTTP.get(URI.parse(page.url))我还想获取HTTP状态,而无需发出第二个请求。有没有办法用另一种方法做到这一点?我一直在查看文档,但似乎找不到我要找的东西。 最佳答案 在我看来,除非您需要一些真正的低级访问或控制,否则最好使用Ruby的内置Open::URI模块:require'open-uri'io=open('http://www.example.org/')#=>#body=io.read[0,50]#=>"["200","OK"]io.base_ur

  9. 程序员如何提高代码能力? - 2

    前言作为一名程序员,自己的本质工作就是做程序开发,那么程序开发的时候最直接的体现就是代码,检验一个程序员技术水平的一个核心环节就是开发时候的代码能力。众所周知,程序开发的水平提升是一个循序渐进的过程,每一位程序员都是从“菜鸟”变成“大神”的,所以程序员在程序开发过程中的代码能力也是根据平时开发中的业务实践来积累和提升的。提高代码能力核心要素程序员要想提高自身代码能力,尤其是新晋程序员的代码能力有很大的提升空间的时候,需要针对性的去提高自己的代码能力。提高代码能力其实有几个比较关键的点,只要把握住这些方面,就能很好的、快速的提高自己的一部分代码能力。1、多去阅读开源项目,如有机会可以亲自参与开源

  10. 7个大一C语言必学的程序 / C语言经典代码大全 - 2

    嗨~大家好,这里是可莉!今天给大家带来的是7个C语言的经典基础代码~那一起往下看下去把【程序一】打印100到200之间的素数#includeintmain(){ inti; for(i=100;i 【程序二】输出乘法口诀表#includeintmain(){inti;for(i=1;i 【程序三】判断1000年---2000年之间的闰年#includeintmain(){intyear;for(year=1000;year 【程序四】给定两个整形变量的值,将两个值的内容进行交换。这里提供两种方法来进行交换,第一种为创建临时变量来进行交换,第二种是不创建临时变量而直接进行交换。1.创建临时变量来

随机推荐