草庐IT

网易Spark Kyuubi核心架构设计与源码实现剖析

xpleaf 2023-03-28 原文
版权声明:本文为xpleaf(香飘叶子)博主原创文章,遵循CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

本文较为系统、全面并且由浅入深地介绍了网易Spark Kyuubi出现的背景、核心架构设计与关键源码实现,是学习、应用和对Kyuubi进行二次开发不可多得的技术干货,但由于作者认知水平有限,文中难免会出现描述不准确的措辞,还请多多包容和指出。

1 概述

Kyuubi是网易数帆旗下易数大数据团队开源的一个高性能的通用JDBC和SQL执行引擎,建立在Apache Spark之上,Kyuubi的出现,较好的弥补了Spark ThriftServer在多租户、资源隔离和高可用等方面的不足,是一个真正可以满足大多数生产环境场景的开源项目。

通过分析Spark ThriftServer的设计与不足,本文会逐渐带你深入理解Kyuubi的核心设计与实现,同时会选取多个关键场景来剖析其源码,通过本文的阅读,希望能让读者对网易Kyuubi的整体架构设计有一个较为清晰的理解,并能够用在自己的生产环境中解决更多实际应用问题。

本文主要主要选取Kyuubi 1.1.0版本来对其设计与实现进行分析,后续的版本迭代社区加入了数据湖等概念和实现,本文不会对这方面的内容进行探讨。

2 Spark ThriftServer的设计、实现与不足

2.1 产生背景

在最初使用Spark时,只有理解了Spark RDD模型和其提供的各种算子时,才能比较好地使用Spark进行数据处理和分析,显然由于向上层暴露了过多底层实现细节,Spark有一定的高使用门槛,在易用性上对许多初入门用户来说并不太友好。

SparkSQL的出现则较好地解决了这一问题,通过使用SparkSQL提供的简易API,用户只需要有基本的编程基础并且会使用SQL,就可以借助Spark强大的快速分布式计算能力来处理和分析他们的大规模数据集。

而Spark ThriftServer的出现使Spark的易用性又向前迈进了一步,通过提供标准的JDBC接口和命令行终端的方式,平台开发者可以基于其提供的服务来快速构建它们的数据分析应用,普通用户甚至不需要有编程基础即可借助其强大的能力来进行交互式数据分析。

2.2 核心设计

顾名思义,本质上,Spark ThriftServer是一个基于Apache Thrift框架构建并且封装了SparkContext的RPC服务端,或者从Spark的层面来讲,我们也可以说,Spark ThriftServer是一个提供了各种RPC服务的Spark Driver。但不管从哪个角度去看Spark ThriftServer,有一点可以肯定的是,它是一个Server,是需要对外提供服务的,因此其是常驻的进程,并不会像一般我们构建的Spark Application在完成数据处理的工作逻辑后就退出。其整体架构图如下所示:

Apache Thrift是业界流行的RPC框架,通过其提供的接口描述语言(IDL),可以快速构建用于数据通信的并且语言无关的RPC客户端和服务端,在带来高性能的同时,大大降低了开发人员构建RPC服务的成本,因此在大数据生态其有较多的应用场景,比如我们熟知的hiveserver2即是基于Apache Thrift来构建其RPC服务。

当用户通过JDBC或beeline方式执行一条SQL语句时,TThreadPoolServer会接收到该SQL,通过一系列的Session和Operation的管理,最终会使用在启动Spark ThriftServer时已经构建好的SparkContext来执行该SQL,并获取最后的结果集。

从上面的基本分析中我们可以看到,在不考虑Spark ThrfitServer的底层RPC通信框架和业务细节时,其整体实现思路是比较清晰和简单的。

当然实际上要构建一个对外提供SQL能力的RPC服务时,是有许多细节需要考虑的,并且工作量也会非常巨大,Spark ThriftServer在实现时实际上也没有自己重复造轮子,它复用了hiveserver2的许多组件和逻辑,并根据自身的业务需求来对其进行特定改造;同样的,后面当我们去看Kyuubi时,也会发现它复用了hiveserver2和Spark ThriftServer的一些组件和逻辑,并在此基础上创新性地设计自己的一套架构。

2.3 基本实现

这里列举的代码是基于Spark 2.1的源码,新版本在结构上可能有所有区别,但不影响我们对其本质实现原理的理解。

前面提到的TThreadPoolServer是Apache Thrift提供的用于构建RPC Server的一个工作线程池类,在Spark ThriftServer的Service体系结构中,ThriftBinaryService正是使用TThreadPoolServer来构建RPC服务端并对外提供一系列RPC服务接口:

Spark ThriftServer Service体系

ThriftBinaryService基于TThreadPoolServer构建RPC服务端

// org.apache.hive.service.cli.thrift.ThriftBinaryCLIService#run public class ThriftBinaryCLIService extends ThriftCLIService { @Override public void run() { // ...省略其它细节... // TCP Server server = new TThreadPoolServer(sargs); server.setServerEventHandler(serverEventHandler); server.serve(); // ...省略其它细节... } } ThriftBinaryService提供的RPC服务接口

// org.apache.hive.service.cli.thrift.TCLIService.Iface TOpenSessionResp OpenSession(TOpenSessionReq req); TCloseSessionResp CloseSession(TCloseSessionReq req); TGetInfoResp GetInfo(TGetInfoReq req); TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req); TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req); TGetCatalogsResp GetCatalogs(TGetCatalogsReq req); TGetSchemasResp GetSchemas(TGetSchemasReq req); TGetTablesResp GetTables(TGetTablesReq req); TGetTableTypesResp GetTableTypes(TGetTableTypesReq req); TGetColumnsResp GetColumns(TGetColumnsReq req); TGetFunctionsResp GetFunctions(TGetFunctionsReq req); TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req); TCancelOperationResp CancelOperation(TCancelOperationReq req); TCloseOperationResp CloseOperation(TCloseOperationReq req); TGetResultSetMetadataResp GetResultSetMetadata(TGetResultSetMetadataReq req); TFetchResultsResp FetchResults(TFetchResultsReq req); TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq req); TCancelDelegationTokenResp CancelDelegationToken(TCancelDelegationTokenReq req); TRenewDelegationTokenResp RenewDelegationToken(TRenewDelegationTokenReq req); 可以看到,其提供的相当一部分接口都是提供SQL服务时所必要的能力。

当然,不管是使用标准的JDBC接口还是通过beeline的方式来访问Spark ThriftServer,必然都是通过Spark基于Apache Thrift构建的RPC客户端来访问这些RPC服务接口的,因此我们去看Spark ThriftServer提供的RPC客户端,其提供的方法接口与RPC服务端提供的是对应的,可以参考org.apache.hive.service.cli.thrift.TCLIService.Client

如果比较难以理解,建议可以先研究一下RPC框架的本质,然后再简单使用一下Apache Thrift来构建RPC服务端和客户端,这样就会有一个比较清晰的理解,这里不对其底层框架和原理做更多深入的分析。个人觉得,要理解Spark ThriftServer,或是后面要介绍的Kyubbi,本质上是理解其通信框架,也就是其是怎么使用Apache Thrift来进行通信的,因为其它的细节都是业务实现。

2.4 主要不足

Spark ThriftServer在带来各种便利性的同时,其不足也是显而易见的。

首先,Spark ThriftServer难以满足生产环境下多租户与资源隔离的场景需求。由于一个Spark ThriftServer全局只有一个SparkContext,也即只有一个Spark Application,其在启动时就确定了全局唯一的用户名,因此在Spark ThriftServer的维护人员看来,所有通过Spark ThriftServer下发的SQL都是来自同一用户(也就是启动时确定的全局唯一的用户名),尽管其背后实际上是由使用Spark ThriftServer服务的不同用户下发的,但所有背后的这些用户都共享使用了Spark ThriftServer的资源、权限和数据,因此我们难以单独对某个用户做资源和权限上的控制,操作审计和其它安全策略。

在Spark ThriftServer执行的一条SQL实际上会被转换为一个job执行,如果用户A下发的SQL的job执行时间较长,必然也会阻塞后续用户B下发的SQL的执行。

其次,单个Spark ThriftServer也容易带来单点故障问题。从Spark ThriftServer接受的客户端请求和其与Executor的通信来考虑,Spark ThriftServer本身的可靠性也难以满足生产环境下的需求。

因此,在将Spark ThriftServer应用于生产环境当中,上面提及的问题和局限性都会不可避免,那业界有没有比较好的解决方案呢?网易开源的Spark Kyuubi就给出了比较好的答案。

3 Kyuubi核心架构设计与源码实现剖析

3.1 Kyuubi核心架构设计

Kyuubi的整体架构设计如下:

Kyuubi从整体上可以分为用户层、服务发现层、Kyuubi Server层、Kyuubi Engine层,其整体概述如下:

  • 用户层

    指通过不同方式使用Kyuubi的用户,比如通过JDBC或beeline方式使用Kyuubi的用户。

  • 服务发现层

    服务发现层依赖于Zookeeper实现,其又分为Kyuubi Server层的服务发现和Kyuubi Engine层的服务发现。

  • Kyuubi Server层

    由多个不同的KyuubiServer实例组成,每个KyuubiServer实例本质上为基于Apache Thrift实现的RPC服务端,其接收来自用户的请求,但并不会真正执行该请求的相关SQL操作,只会作为代理转发该请求到Kyuubi Engine层用户所属的SparkSQLEngine实例上。

  • Kyuubi Engine层

    由多个不同的SparkSQLEngine实例组成,每个SparkSQLEngine实例本质上为基于Apache Thrift实现的并且持有一个SparkSession实例的RPC服务端,其接收来自KyuubiServer实例的请求,并通过SparkSession实例来执行。在Kyuubi的USER共享层级上,每个SparkSQLEngine实例都是用户级别的,即不同的用户其会持有不同的SparkSQLEngine实例,以实现用户级别的资源隔离和控制。

下面将会对每一层以及它们的协作与交互展开较为详细的分析。

3.1.1 用户层

用户层就是指实际需要使用Kyuubi服务的用户,它们通过不过的用户名进行标识,以JDBC或beeline方式进行连接。

比如我们可以在beeline中指定以不同用户名进行登录:

使用xpleaf用户名进行登录

./beeline -u 'jdbc:hive2://10.2.10.1:10009' -n xpleaf 使用yyh用户名进行登录

./beeline -u 'jdbc:hive2://10.2.10.1:10010' -n yyh 使用leaf用户名进行登录

./beeline -u 'jdbc:hive2://10.2.10.2:10009' -n leaf 当然,这里的用户名或登录标识并不是可以随意指定或使用的,它应该根据实际使用场景由运维系统管理人员进行分配,并且其背后应当有一整套完整的认证、授权和审计机制,以确保整体系统的安全。

3.1.2 服务发现层

服务发现层主要是指Zookeepr服务以及Kyuubi Server层的KyuubiServer实例和Kyuubi Engine层的SparkSQLEngine在上面注册的命名空间(即node节点),以提供负载均衡和高可用等特性,因此它分为Kyuubi Server层的服务发现和Kyuubi Engine层的服务发现。

Kyuubi Server层的服务发现

Kyuubi Server层的服务发现是需要用户感知的。

KyuubiServer实例在启动之后都会向Zookeeper的/kyuubi节点下面创建关于自己实例信息的节点,主要是包含KyuubiServer实例监听的host和port这两个关键信息,这样用户在连接KyuubiServer时,只需要到Zookeeper的/kyuubi节点下面获取对应的服务信息即可,当有多个KyuubiServer实例时,选取哪一个实例进行登录,这个是由用户自行决定的,Kyuubi本身并不会进行干预。

在实际应用时也可以封装接口实现随机返回实例给用户,以避免直接暴露Kyuubi的底层实现给用户。

另外,KyuubiServer实例是对所有用户共享,并不会存在特定KyuubiServer实例只对特定用户服务的问题。

当然在实际应用时你也可以这么做,比如你可以不对用户暴露服务发现,也就是不对用户暴露Zookeeper,对于不同用户,直接告诉他们相应的KyuubiServer实例连接信息即可。不过这样一来,Kyuubi Server层的高可用就难以保证了。

比如有多个在不同节点上启动的KyuubiServer实例,其在Zookeeper上面注册的信息如下:

/kyuubi/instance1_10.2.10.1:10009 /kyuubi/instance2_10.2.10.1:10010 /kyuubi/instance3_10.2.10.2:10009 Kyuubi Engine层的服务发现

Kyuubi Engine层的服务发现是不需要用户感知的,其属于Kyuubi内部不同组件之间的一种通信协作方式。

SparkSQLEngine实例在启动之后都会向Zookeeper的/kyuubi_USER节点下面创建关于自己实例信息的节点,主要是包含该实例监听的host和port以及其所属user的相关信息,也就是说SparkSQLEngine实例并不是所有用户共享的,它是由用户独享的。

比如Kyuubi系统中有多个不同用户使用了Kyuubi服务,启动了多个SparkSQLEngine实例,其在Zookeeper上面注册的信息如下:

/kyuubi_USER/xpleaf/instance1_10.2.20.1:52643 /kyuubi_USER/yyh/instance2_10.2.10.1:52346 /kyuubi_USER/leaf/instance3_10.2.10.2:51762

3.1.3 Kyuubi Server层

Kyuubi Server层由多个不同的KyuubiServer实例组成,每个KyuubiServer实例本质上为基于Apache Thrift实现的RPC服务端,其接收来自用户的请求,但并不会真正执行该请求的相关SQL操作,只会作为代理转发该请求到Kyuubi Engine层用户所属的SparkSQLEngine实例上。

整个Kyuubi系统中需要存在多少个KyuubiServer实例是由Kyuubi系统管理员决定的,根据实际使用Kyuubi服务的用户数和并发数,可以部署一个或多个KyuubiServer实例,以满足SLA要求。当然后续发现KyuubiServer实例不够时,可以横向动态扩容,只需要在Kyuubi中系统配置好host和port,启动新的KyuubiServer实例即可。

3.1.4 Kyuubi Engine层

Kyuubi Engine层由多个不同的SparkSQLEngine实例组成,每个SparkSQLEngine实例本质上为基于Apache Thrift实现的并且持有一个SparkSession实例的RPC服务端,其接收来自KyuubiServer实例的请求,并通过SparkSession实例来执行。在Kyuubi的USER共享层级上,每个SparkSQLEngine实例都是用户级别的,即不同的用户其会持有不同的SparkSQLEngine实例,以实现用户级别的资源隔离和控制。

SparkSQLEngine实例是针对不同的用户按需启动的。在Kyuubi整体系统启动之后,如果没有用户访问Kyuubi服务,实际上在整个系统中只有一个或多个KyuubiServer实例,当有用户通过JDBC或beeline的方式连接KyuubiServer实例时,其会在Zookeeper上去查找是否存在用户所属的SparkSQLEngine实例,如果没有,则通过spark-submit提交一个Spark应用,而这个Spark应用本身就是SparkSQLEngine,启动后,基于其内部构建的SparkSession实例,即可为特定用户执行相关SQL操作。

3.1.5 整体协作流程

通过前面对各层的介绍,结合KyubbiServer架构图,以用户xpleaf访问Kyuubi服务为例来描述整个流程。

  • 1.Kyuubi系统管理员在大数据集群中启动了3个KyuubiServer实例和1个Zookeeper集群,其中3个KyuubiServer实例的连接信息分别为10.2.10.1:1000910.2.10.1:1001010.2.10.2:1009

  • 2.用户xpleaf通过beeline终端的方式连接了其中一个KyuubiServer实例;

    ./beeline -u 'jdbc:hive2://10.2.10.1:10009' -n xpleaf
    在这里我们假设用户xpleaf事先已经通过管理员告知的方式知道了该KyuubiServer实例的连接信息。

  • 3.KyuubiServer_instance1接收到xpleaf的连接请求,会为该用户创建session会话,同时会去Zookeeper上检查是否已经存在xpleaf所属的SparkSQLEngine实例;

    • 3.1 如果已经存在,则获取其连接信息;
    • 3.2 如果不存在,则通过spark-submit的方式提交一个Spark应用,启动一个SparkSQLEngine实例;
  • 4.KyuubiServer_instance1在Zookeeper上没有找到xpleaf所属的SparkSQLEngine实例信息,其通过spark-submit的方式启动了一个SparkSQLEngine实例;

  • 5.属于xpleaf用户的新的SparkSQLEngine_instance1实例在10.2.10.1节点上进行启动,并且监听的52463端口,启动后,其向Zookeeper注册自己的连接信息/kyuubi_USER/xpleaf/instance1_10.2.10.1:52463

  • 6.KyuubiServer_instance1在检测到SparkSQLEngine_instance1启动成功后,会向其发送创建session会话的连接请求;

  • 7.SparkSQLEngine_instance1收到KyuubiServer_instance1创建session会话的连接请求,则创建一个新的session会话;

  • 8.用户启动beeleine完成并成功创建会话,接着用户执行SQL查询;

    0: jdbc:hive2://10.2.10.1:10009> select * from teacher;
  • 9.KyuubiServer_instance1接收到xpleaf的执行SQL查询的请求,会先检查是否存在xpleaf所属的SparkSQLEngine实例;

  • 10.KyuubiServer_instance1找到xpleaf所属的SparkSQLEngine_instance1实例,接着会为这次执行SQL的操作创建一个Operation;

  • 11.KyuubiServer_instance1根据连接信息创建了一个RPC Client,并且构建SQL执行的RPC请求,发到对应的SparkSQLEngine_instance1实例上;

  • 12.SparkSQLEngine_instance1接收到该请求后,会创建一个该SQL操作的Operation,并且使用其内部的SparkSession实例来进行执行,最后将执行结果返回给KyuubiServer_instance1;

  • 13.KyuubiServer_instance1接收到SparkSQLEngine_instance1的执行结果,返回给用户,这样一次SQL查询操作就完成了。

    0: jdbc:hive2://localhost:10009> select * from teacher; +-----------+------------+--------------+ | database | tableName | isTemporary | +-----------+------------+--------------+ | default | teacher | false | +-----------+------------+--------------+ 1 row selected (0.19 seconds)
透过整体协作流程我们可以看到:

  • 站在用户层视角来看,其为RPC客户端,而为其提供RPC服务的是Kyuubi Server层,在这里,Kyuubi Server是RPC服务端;
  • 站在Kyuubi Server层视角来看,其既是为用户层提供RPC服务的RPC服务端,同时也是使用Kyuubi Engine层RPC服务的RPC客户端;
  • 站在Kyuubi Engine层视角来看,其为RPC服务端,其为Kyuubi Server层提供RPC服务;
Kyuubi在整体Server端和Client端以及其实现功能的设计上,是十分清晰的。

3.2 Kyuubi源码实现剖析

通过前面对Kyuubi各层以及整体协作流程的描述,相信对Kyuubi的核心架构设计会有一个比较清晰的理解,这样再去分析Kyuubi的源码时就会简单很多。

首先我们会来介绍Kyuubi整体的Service体系与组合关系,以对Kyuubi整体核心代码有一个概览性的理解,接着会选取多个关键场景来对Kyuubi的源码进行分析,并且给出每个场景的代码执行流程图。

确实没有办法在较为简短的篇幅里为大家介绍Kyuubi源码的方方面面,但我个人认为不管对于哪个大数据组件,在理解了其底层通信框架的基础上,再选取关于该组件的几个或多个关键场景来分析其源码,基本上对其整体设计就会有概览性的理解,这样后面对于该组件可能出现的Bug进行排查与修复,或是对该组件进行深度定制以满足业务的实际需求,我相信问题都不大——这也就达到了我们的目的,就是去解决实际问题。

当然,在这个过程当中你也可以欣赏到漂亮的代码,这本身也是一种享受。

3.2.1 RPC与Apache Thrift基本概述

RPC

RPC(Remote Procedure Call)远程过程调用,如果按照百度百科的解释会非常羞涩难懂(上面提供的图应该还是《TCP/IP详解卷1:协议》上面的一个图),但实际上我们就可以简单地把它理解为,一个进程调用另外一个进程的服务即可,不管是通过Socket、内存共享或是网络的方式,只要其调用的服务的具体实现不是在调用方的进程内完成的就可以,目前我们见得比较多的是通过网络通信调用服务的方式。

在Java语言层面上比较普遍的RPC实现方式是,反射+网络通信+动态代理的方式来实现RPC,而网络通信由于需要考虑各种性能指标,主要用的Netty或者原生的NIO比较多,Socket一般比较少用,比如可以看一下阿里Doubbo的实现。

如果想加深这方面的理解,可以参考我的一个开源RPC框架,其实就是非常mini版的Doubbo实现:https://github.com/xpleaf/minidubbo,建议有时间可以看下,实际上这会非常有用,因为几乎所有的大数据组件都会用到相关的RPC框架,不管是开源三方的还是其自己实现的(比如Hadoop的就是使用自己实现的一套RPC框架)。

Apache Thrift

Apache Thrift是业界流行的RPC框架,通过其提供的接口描述语言(IDL),可以快速构建用于数据通信的并且语言无关的RPC客户端和服务端,在带来高性能的同时,大大降低了开发人员构建RPC服务的成本,因此在大数据生态其有较多的应用场景,比如我们熟知的hiveserver2即是基于Apache Thrift来构建其RPC服务。

3.2.2 Kyuubi Service体系与组合关系

在看Kyuubi的源码时,我们可以把较多精力放在某几种较重要的类和其体系上,这样有助于我们抓住重点,理解Kyuubi最核心的部分。仅考虑Kyuubi整体的架构设计和实现,比较重要的是Service、Session和Operation等相关的类和体系。

Service体系

Service,顾名思义就是服务,在Kyuubi中,各种不同核心功能的提供都是通过其Service体系下各个实现类来进行提供的。我们前面提到的服务发现层、Kyuubi Server层和Kyuubi Engine层,在代码实现上绝大部分核心功能都是由Kyuubi源码项目的Server类体系来完成的,可以这么说,理解了Service体系涉及类的相关功能,就基本上从源码级别上理解了整个Kyuubi的体系架构设计和实现。

当然这些Service的实现类并不一定使用Service结尾,比如SessionManager、OperationManager等,但基本上从名字我们就能对其功能窥探一二。

其完整的继承关系如下:

基于Kyuubi提供的核心功能,我们可以大致按Kyuubi Server层和Kyuubi Engine层来将整个体系中的Service类进行一个划分:

  • Kyuubi Server层
    • 功能入口 - KyuubiServer:提供main方法,是Kyuubi Server层KyuubiServer实例初始化和启动的入口;
    • 服务发现 - KyuubiServiceDiscovery:封装了zkClient,用来与Zookeeper服务进行交互;
    • 核心功能 - FrontendService:封装了Apache Thrift的TThreadPoolServer,在Kyuubi Server层,其主要用于向用户层提供RPC服务; - KyuubiBackendService:封装了来自用户层不同RPC请求的处理逻辑,比如openSessionexecuteStatementfetchResults等;
    • Session管理 - KyuubiSessionManager:提供对用户层的请求会话(session)管理;
    • Operation管理 - KyuubiOperationManager:提供对用户层的请求操作(operation)管理;
  • Kyuubi Engine层
    • 功能入口 - SparkSQLEngine:提供main方法,是Kyuubi Engine层SparkSQLEngine实例初始化和启动的入口;
    • 服务发现 - EngineServiceDiscovery:封装了zkClient,用来与Zookeeper服务进行交互;
    • 核心功能 - FrontendService:封装了Apache Thrift的TThreadPoolServer,在Kyuubi Engine层,其主要用于向Kyuubi Server层提供RPC服务; - SparkSQLBackendService:封装了来自Kyuubi Server层不同RPC请求的处理逻辑,比如openSessionexecuteStatementfetchResults等;
    • Session管理 - SparkSQLSessionManager:提供对Kyuubi Server层的请求会话(session)管理;
    • Operation管理 - SparkSQLOperationManager:提供对Kyuubi Server层的请求操作(operation)管理;
这里我们只对具体实现类进行归类,因为中间抽象类只是提取多个子类的公共方法,不影响我们对其体系功能的说明和讲解;而以Noop开头的实际上是Kyuubi的测试实现类,因此我们也不展开说明;KinitAuxiliaryService是Kyuubi中用于认证的类,这里我们不对其认证功能实现进行说明。

通过对Service体系各个具体实现类的介绍,再回顾前面对Kyuubi整体架构和协作流程的介绍,其抽象的功能在源码实现类上面就有了一个相对比较清晰的体现,并且基本上也是可以一一对应上的。

Service组合关系

为了理解Kyuubi在源码层面上是如何进行整体协作的,除了前面介绍的Service体系外,我们还有必要理清其各个Service之间的组合关系。

在整个Service体系中,CompositeService这个中间抽象类在设计上是需要额外关注的,它表示的是在它之下的实现类都至少有一个成员为其它Service服务类对象,比如对于KyuubiServer,它的成员则包含有KyuubiBackdServiceKyuubiServiceDiscovery等多个Service实现类,SparkSQLEngine也是如此。

我们将一些关键的Service类及其组合关系梳理如下,这对后面我们分析关键场景的代码执行流程时会提供很清晰的思路参考:

Session与SessionHandle

  • Session
当我们使用通过JDBC或beeline的方式连接Kyuubi时,实际上在Kyuubi内部就为我们创建了一个Session,用以标识本次会话的所有相关信息,后续的所有操作都是基于这次会话来完成的,我们可以在一次会话下执行多个操作(比如多次执行某次SQL,我们只需要建立一次会话连接即可)。

Session在Kyuubi中又分为Kyuubi Server层的Session和Kyuubi Engine层的Session。Kyuubi Server层的Session实现类为KyuubiSessionImpl,用来标识来自用户层的会话连接信息;Kyuubi Engine层的Session实现类为SparkSessionImpl,用来标识来自Kyuubi Server层的会话连接信息。两个Session实现类都有一个共同的抽象父类AbstractSession,用于Session操作的主要功能逻辑都是在该类实现的。

  • SessionHandle
Session对象的存储实际上由SessionManager来完成,在SessionManager内部其通过一个Map来存储Session的详细信息,其中key为SessionHandle,value为Session对象本身。SessionHandle可以理解为就是封装了一个唯一标识一个用户会话的字符串,这样用户在会话建立后进行通信时只需要携带该字符串标识即可,并不需要传输完整的会话信息,以避免网络传输带来的开销。

Operation与OperationHandle

  • Operation
用户在建立会话后执行的相关语句在Kyuubi内部都会抽象为一个个的Operation,比如执行一条SQL语句对应的Operation实现类为Executement,不过需要注意,Operation又分为Kyuubi Server层的KyuubiOperation和Kyuubi Engine层的SparkOperation。Kyuubi Server层的Operation并不会执行真正的操作,它只是一个代理,它会通过RPC Client请求Kyuubi Engine层来执行该Operation,因此所有Operation的真正执行都是在Kyuubi Engine层来完成的。

由于Operation都是建立在Session之下的,所以我们在看前面的组合关系时可以看到,用于管理Operation的OperationManager为SessionManager的成员属性。

  • OperationHandle
Operation对象的存储实际上由OprationManager来完成,在SessioOprationManagerManager内部其通过一个Map来存储Session的详细信息,其中key为OperationHandle,value为Operation对象本身。OperationHandle可以理解为就是封装了一个唯一标识一个用户操作的字符串,这样用户基于会话的操作时只需要携带该字符串标识即可,并不需要传输完整的操作信息,以避免网络传输带来的开销。

第一次提交Operation时还是需要完整信息,后续只需要提供OperationHandle即可,实际上SQL语句的执行在Kyuubi内部是异步执行的,用户端在提交Opeation后即可获得OperationHandle,后续只需要持着该OperationHandle去获取结果即可,我们在分析SQL执行的代码时就可以看到这一点。

3.2.3 Kyuubi启动流程

Kyuubi的启动实际上包含两部分,分别是KyuubiServer的启动和SparkSQLEngine的启动。KyuubiServer实例的启动发生在系统管理员根据实际业务需要启动KyuubiServer实例,这个是手动操作完成的;而SparkSQLEngine实例的启动则是在为用户建立会话时为由KyuubiServer实例通过spark-submit的方式去提交一个Spark应用来完成的。

KyuubiServer启动流程

当我们在Kyuubi的bin目录下去执行./kyuubi run命令去启动KyuubiServer时,就会去执行KyuubiServer的main方法:

def main(args: Array[String]): Unit = { info( """ | Welcome to | __ __ __ | /\ \/\ \ /\ \ __ | \ \ \/'/' __ __ __ __ __ __\ \ \____/\_\ | \ \ , < /\ \/\ \/\ \/\ \/\ \/\ \\ \ '__`\/\ \ | \ \ \\`\\ \ \_\ \ \ \_\ \ \ \_\ \\ \ \L\ \ \ \ | \ \_\ \_\/`____ \ \____/\ \____/ \ \_,__/\ \_\ | \/_/\/_/`/___/> \/___/ \/___/ \/___/ \/_/ | /\___/ | \/__/ """.stripMargin) info(s"Version: $KYUUBI_VERSION, Revision: $REVISION, Branch: $BRANCH," + s" Java: $JAVA_COMPILE_VERSION, Scala: $SCALA_COMPILE_VERSION," + s" Spark: $SPARK_COMPILE_VERSION, Hadoop: $HADOOP_COMPILE_VERSION," + s" Hive: $HIVE_COMPILE_VERSION") info(s"Using Scala ${Properties.versionString}, ${Properties.javaVmName}," + s" ${Properties.javaVersion}") SignalRegister.registerLogger(logger) val conf = new KyuubiConf().loadFileDefaults() UserGroupInformation.setConfiguration(KyuubiHadoopUtils.newHadoopConf(conf)) startServer(conf) } 在加载完配置信息后,通过调用startServer(conf)方法,就开始了KyuubiServer的启动流程:

def startServer(conf: KyuubiConf): KyuubiServer = { if (!ServiceDiscovery.supportServiceDiscovery(conf)) { zkServer.initialize(conf) zkServer.start() conf.set(HA_ZK_QUORUM, zkServer.getConnectString) conf.set(HA_ZK_ACL_ENABLED, false) } val server = new KyuubiServer() server.initialize(conf) server.start() sys.addShutdownHook(server.stop()) server } 可以看到,实际上KyuubiServer的启动包括两部分:初始化和启动。

KyuubiServer的初始化和启动实际上是一个递归初始化和启动的过程。我们前面提到,KyuubiServer为Service体系下的一个CompositeService,参考前面给出的组合关系图,它本身的成员又包含了多个Service对象,它们都保存在保存在serviceList这个成员当中,因此初始化和启动KyuubiServer实际上就是初始化和启动serviceList中所包含的各个Service对象。而这些Service对象本身又可能是CompositeService,因此KyuubiServer的启动和初始化实际上就是一个递归初始化和启动的过程。

// 递归初始化serviceList下的各个服务 override def initialize(conf: KyuubiConf): Unit = { serviceList.foreach(_.initialize(conf)) super.initialize(conf) } // 递归启动serviceList下的各个服务 override def start(): Unit = { serviceList.zipWithIndex.foreach { case (service, idx) => try { service.start() } catch { case NonFatal(e) => error(s"Error starting service ${service.getName}", e) stop(idx) throw new KyuubiException(s"Failed to Start $getName", e) } } super.start() } 这样一来,整个KyuubiServer的启动流程就比较清晰了,这也是我们在最开始就列出其Service体系和组合关系的原因,由于整体的启动流程和细节所包含的代码比较多,我们就没有必要贴代码了,这里我把整个初始化和启动流程步骤的流程图梳理了出来,待会再对其中一些需要重点关注的点进行说明,如下:

我们重点关注一下FontendServiceServiceDiscoveryService的初始化和启动流程。

  • FrontendService的初始化和启动
我们需要重点关注一下FrontendService,因为KyuubiServer实例对外提供RPC服务都是由其作为入口来完成的。

其初始化时主要是获取和设置了Apache Thrift内置的用于构建RPC服务端的TThreadPoolServer的相关参数:

override def initialize(conf: KyuubiConf): Unit = synchronized { this.conf = conf try { hadoopConf = KyuubiHadoopUtils.newHadoopConf(conf) val serverHost = conf.get(FRONTEND_BIND_HOST) serverAddr = serverHost.map(InetAddress.getByName).getOrElse(InetAddress.getLocalHost) portNum = conf.get(FRONTEND_BIND_PORT) val minThreads = conf.get(FRONTEND_MIN_WORKER_THREADS) val maxThreads = conf.get(FRONTEND_MAX_WORKER_THREADS) val keepAliveTime = conf.get(FRONTEND_WORKER_KEEPALIVE_TIME) val executor = ExecutorPoolCaptureOom( name + "Handler-Pool", minThreads, maxThreads, keepAliveTime, oomHook) authFactory = new KyuubiAuthenticationFactory(conf) val transFactory = authFactory.getTTransportFactory val tProcFactory = authFactory.getTProcessorFactory(this) val serverSocket = new ServerSocket(portNum, -1, serverAddr) portNum = serverSocket.getLocalPort val tServerSocket = new TServerSocket(serverSocket) val maxMessageSize = conf.get(FRONTEND_MAX_MESSAGE_SIZE) val requestTimeout = conf.get(FRONTEND_LOGIN_TIMEOUT).toInt val beBackoffSlotLength = conf.get(FRONTEND_LOGIN_BACKOFF_SLOT_LENGTH).toInt val args = new TThreadPoolServer.Args(tServerSocket) .processorFactory(tProcFactory) .transportFactory(transFactory) .protocolFactory(new TBinaryProtocol.Factory) .inputProtocolFactory( new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize)) .requestTimeout(requestTimeout).requestTimeoutUnit(TimeUnit.SECONDS) .beBackoffSlotLength(beBackoffSlotLength) .beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS) .executorService(executor) // TCP Server server = Some(new TThreadPoolServer(args)) server.foreach(_.setServerEventHandler(new FeTServerEventHandler)) info(s"Initializing $name on host ${serverAddr.getCanonicalHostName} at port $portNum with" + s" [$minThreads, $maxThreads] worker threads") } catch { case e: Throwable => throw new KyuubiException( s"Failed to initialize frontend service on $serverAddr:$portNum.", e) } super.initialize(conf) } 可以看到主要是host、port、minThreads、maxThreads、maxMessageSize、requestTimeout等,这些参数都是可配置的,关于其详细作用可以参考KyuubiConf这个类的说明。

其启动比较简单,主要是调用TThreadPoolServerserver()方法来完成:

override def start(): Unit = synchronized { super.start() if(!isStarted) { serverThread = new NamedThreadFactory(getName, false).newThread(this) serverThread.start() isStarted = true } } override def run(): Unit = try { info(s"Starting and exposing JDBC connection at: jdbc:hive2://$connectionUrl/") server.foreach(_.serve()) } catch { case _: InterruptedException => error(s"$getName is interrupted") case t: Throwable => error(s"Error starting $getName", t) System.exit(-1) }
  • ServiceDiscoveryService的初始化和启动
初始化时主要是创建一个用于后续连接ZooKeeper的zkClient:

def namespace: String = _namespace override def initialize(conf: KyuubiConf): Unit = { this.conf = conf _namespace = conf.get(HA_ZK_NAMESPACE) val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT) val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES) setUpZooKeeperAuth(conf) _zkClient = buildZookeeperClient(conf) zkClient.getConnectionStateListenable.addListener(new ConnectionStateListener { private val isConnected = new AtomicBoolean(false) override def stateChanged(client: CuratorFramework, newState: ConnectionState): Unit = { info(s"Zookeeper client connection state changed to: $newState") newState match { case CONNECTED | RECONNECTED => isConnected.set(true) case LOST => isConnected.set(false) val delay = maxRetries.toLong * maxSleepTime connectionChecker.schedule(new Runnable { override def run(): Unit = if (!isConnected.get()) { error(s"Zookeeper client connection state changed to: $newState, but failed to" + s" reconnect in ${delay / 1000} seconds. Give up retry. ") stopGracefully() } }, delay, TimeUnit.MILLISECONDS) case _ => } } }) zkClient.start() super.initialize(conf) } 当然这里还看到其获取了一个HA_ZK_NAMESPACE的配置值,其默认值为kyuubi

val HA_ZK_NAMESPACE: ConfigEntry[String] = buildConf("ha.zookeeper.namespace") .doc("The root directory for the service to deploy its instance uri. Additionally, it will" + " creates a -[username] suffixed root directory for each application") .version("1.0.0") .stringConf .createWithDefault("kyuubi") 在ServiceDiscoveryService进行启动的时候,就会基于该namesapce来构建在Kyuubi Server层进行服务发现所需要的KyuubiServer实例信息:

override def start(): Unit = { val ns = ZKPaths.makePath(null, namespace) try { zkClient .create() .creatingParentsIfNeeded() .withMode(PERSISTENT) .forPath(ns) } catch { case _: NodeExistsException => // do nothing case e: KeeperException => throw new KyuubiException(s"Failed to create namespace '$ns'", e) } val instance = server.connectionUrl val pathPrefix = ZKPaths.makePath( namespace, s"serviceUri=$instance;version=$KYUUBI_VERSION;sequence=") try { _serviceNode = new PersistentEphemeralNode( zkClient, PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, instance.getBytes(StandardCharsets.UTF_8)) serviceNode.start() val znodeTimeout = 120 if (!serviceNode.waitForInitialCreate(znodeTimeout, TimeUnit.SECONDS)) { throw new KyuubiException(s"Max znode creation wait time $znodeTimeout s exhausted") } info(s"Created a ${serviceNode.getActualPath} on ZooKeeper for KyuubiServer uri: " + instance) } catch { case e: Exception => if (serviceNode != null) { serviceNode.close() } throw new KyuubiException( s"Unable to create a znode for this server instance: $instance", e) } super.start() } 在这里,就会在Zookeeper的/kyuubi节点下面创建一个包含KyuubiServer实例详细连接信息的节点,假设KyuubiServer实例所配置的host和post分别为10.2.10.110009,那么其所创建的zk节点为:

[zk: localhost:2181(CONNECTED) 87] ls /kyuubi [serviceUri=10.2.10.1:10009;version=1.1.0;sequence=0000000007]
  • KyuubiSessionManager的初始化和启动
我们主要关注一下其启动过程:

// org.apache.kyuubi.session.SessionManager#start override def start(): Unit = { startTimeoutChecker() super.start() } // org.apache.kyuubi.session.SessionManager#startTimeoutChecker private def startTimeoutChecker(): Unit = { val interval = conf.get(SESSION_CHECK_INTERVAL) val timeout = conf.get(SESSION_TIMEOUT) val checkTask = new Runnable { override def run(): Unit = { val current = System.currentTimeMillis if (!shutdown) { for (session <- handleToSession.values().asScala) { if (session.lastAccessTime + timeout <= current && session.getNoOperationTime > timeout) { try { closeSession(session.handle) } catch { case e: KyuubiSQLException => warn(s"Error closing idle session ${session.handle}", e) } } else { session.closeExpiredOperations } } } } } timeoutChecker.scheduleWithFixedDelay(checkTask, interval, interval, TimeUnit.MILLISECONDS) } 在这里主要完成的事情:

1.获取session check interval;

2.获取session timout;

3.起一个schedule的调度线程;

4.根据interval和timeout对handleToSession的session进行检查;

5.如果session超时(超过timeout没有access),则closesession;

那么对于KyuubiServer的启动过程我们就分析到这里,更多细节部分大家可以结合我的流程图来自行阅读代码即可,实际上当我们把Kyuubi的Service体系和组合关系整理下来之后,再去分析它的启动流程时就会发现简单很多,这个过程中无非就是要关注它的一些相关参数获取和设置是在哪里完成的,它是怎么侦听服务的(真正用于侦听host和port的server的启动)。

SparkSQLEngine启动流程

在KyuubiServer为用户建立会话时会去通过服务发现层去Zookeeper查找该用户是否存在对应的SparkSQLEngine实例,如果没有则通过spark-submit的启动一个属于该用户的SparkSQLEngine实例。

后面在分析KyuubiServer Session建立过程会提到,实际上KyuubiServer是通过调用外部进程命令的方式来提交一个Spark应用的,为了方便分析SparkSQLEngine的启动流程,这里我先将其大致的命令贴出来:

/Users/xpleaf/app/kyuubi-1.1.0-bin-spark-3.0-hadoop2.7/externals/spark-3.0.2-bin-hadoop2.7/bin/spark-submit \ --class org.apache.kyuubi.engine.spark.SparkSQLEngine \ --conf spark.app.name=kyuubi_USER_xpleaf_2dd0b8a8-e8c3-4788-8586-387622630b73 \ --conf spark.hive.server2.thrift.resultset.default.fetch.size=1000 \ --conf spark.kyuubi.ha.zookeeper.namespace=/kyuubi_USER/xpleaf \ --conf spark.kyuubi.ha.zookeeper.quorum=127.0.0.1:2181 \ --conf spark.yarn.tags=KYUUBI \ --conf spark.kyuubi.ha.zookeeper.acl.enabled=false \ --proxy-user xpleaf /Users/xpleaf/app/kyuubi-1.1.0-bin-spark-3.0-hadoop2.7/externals/engines/spark/kyuubi-spark-sql-engine-1.1.0.jar kyuubi-spark-sql-engine-1.1.0.jar是Kyuubi发布版本里面的一个jar包,里面就包含了SparkSQLEngine这个类,通过-class参数我们可以知道,实际上就是要运行SparkSQLEngine的main方法,由于开启了SparkSQLEngine的启动流程。

需要说明的是,提交Sparkk App的这些参数在SparkSQLEngine启动之前都会被设置到SparkSQLEngine的成员变量kyuubiConf当中,获取方法比较简单,通过scala提供的sys.props就可以获取,这些参数在SparkSQLEngine的初始化和启动中都会起到十分关键的作用。

接下来我们看一下SparkSQLEngine的main方法:

def main(args: Array[String]): Unit = { SignalRegister.registerLogger(logger) var spark: SparkSession = null var engine: SparkSQLEngine = null try { spark = createSpark() engine = startEngine(spark) info(KyuubiSparkUtil.diagnostics(spark)) // blocking main thread countDownLatch.await() } catch { case t: Throwable => error("Error start SparkSQLEngine", t) if (engine != null) { engine.stop() } } finally { if (spark != null) { spark.stop() } } } 首先会通过createSpark()创建一个SparkSession对象,后续SQL的真正执行都会交由其去执行,其创建方法如下:

def createSpark(): SparkSession = { val sparkConf = new SparkConf() sparkConf.setIfMissing("spark.sql.legacy.castComplexTypesToString.enabled", "true") sparkConf.setIfMissing("spark.master", "local") sparkConf.setIfMissing("spark.ui.port", "0") val appName = s"kyuubi_${user}_spark_${Instant.now}" sparkConf.setIfMissing("spark.app.name", appName) kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_BIND_PORT, 0) kyuubiConf.setIfMissing(HA_ZK_CONN_RETRY_POLICY, RetryPolicies.N_TIME.toString) // Pass kyuubi config from spark with `spark.kyuubi` val sparkToKyuubiPrefix = "spark.kyuubi." sparkConf.getAllWithPrefix(sparkToKyuubiPrefix).foreach { case (k, v) => kyuubiConf.set(s"kyuubi.$k", v) } if (logger.isDebugEnabled) { kyuubiConf.getAll.foreach { case (k, v) => debug(s"KyuubiConf: $k = $v") } } val session = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() session.sql("SHOW DATABASES") session } 这里主要是设置了一些在创建SparkSession时需要的参数,包括appName、spark运行方式、spark ui的端口等。另外这里还特别对frontend.bind.port参数设置为0,关于该参数本身的定义如下:

val FRONTEND_BIND_PORT: ConfigEntry[Int] = buildConf("frontend.bind.port") .doc("Port of the machine on which to run the frontend service.") .version("1.0.0") .intConf .checkValue(p => p == 0 || (p > 1024 && p < 65535), "Invalid Port number") .createWithDefault(10009) 可以看到其默认值为10009,前面KyuubiServer在构建TThreadPoolServer时就直接使用了默认值,这也是我们启动的KyuubiServer实例侦听10009端口的原因,而在这里,也就是SparkSQLEngine启动时将其设置为0是有原因,我们将在下面继续说明。

创建完成SparkSession后才调用startEngine(spark)方法启动SparkSQLEngine本身:

def startEngine(spark: SparkSession): SparkSQLEngine = { val engine = new SparkSQLEngine(spark) engine.initialize(kyuubiConf) engine.start() sys.addShutdownHook(engine.stop()) currentEngine = Some(engine) engine } 可以看到也是先进行初始化,然后再启动,SparkSQLEngine本身是CompositeService,所以初始化和启动过程跟KyuubiServer是一模一样的(当然其包含的成员会有所差别),都是递归对serviceList中所包含的各个Service对象进行初始化和启动:

  • FrontendService的初始化和启动
FrontendService在SparkSQLEngine中的启动流程与在KyuubiServer中的启动流程是基本一样的,可以参考前面的说明,这里主要说明一些比较细微的差别点。

前面已经设置了frontend.bind.port参数的值为0,在FrontendService这个类当中,它会赋值给portNum这个变量,用以构建TThreadPoolServer所需要的参数ServerSocket对象:

// org.apache.kyuubi.service.FrontendService#initialize val serverSocket = new ServerSocket(portNum, -1, serverAddr) 所以实际上,不管是KyuubiServer还是SparkSQLEngine,其所侦听的端口是在这里构建ServerSocket对象的时候确定下来的,对ServerSocket对象,如果传入一个为0的portNum,则表示使用系统随机分配的端口号,所以这也就是我们在启动了SparkSQLEngine之后看到其侦听的端口号都是随机端口号的原因。

  • ServiceDiscoveryService的初始化和启动
与KyuubiServer类似,这里分析一下其差别点。

前面在通过spark-submit提交应用时传入了--conf spark.kyuubi.ha.zookeeper.namespace=/kyuubi_USER/xpleaf的参数,实际上在SparkSQLEngine初始化KyuubiConfig对象时会设置到KyuubiConfig.HA_ZK_NAMESPACE属性上,因此在ServiceDiscoveryService初始化时获取的namespace实际上就为/kyuubi_USER/xpleaf,而不是默认的kyuubi,这点是需要注意的:

def namespace: String = _namespace override def initialize(conf: KyuubiConf): Unit = { this.conf = conf _namespace = conf.get(HA_ZK_NAMESPACE) // 省略其它代码 } 因此在启动调用start()方法时,其在Zookeeper上构建的znode节点也就不同:

override def start(): Unit = { // 省略其它代码 val instance = server.connectionUrl val pathPrefix = ZKPaths.makePath( namespace, s"serviceUri=$instance;version=$KYUUBI_VERSION;sequence=") // 省略其它代码 } 比如其创建的znode节点为:

[zk: localhost:2181(CONNECTED) 94] ls /kyuubi_USER/xpleaf [serviceUri=10.2.10.1:52643;version=1.1.0;sequence=0000000004]
  • SparkSQLSessionManager的初始化和启动
SparkSQLSessionManager也是继承自SessionManager,因此与KyuubiServer的KyuubiSessionManager一样,其也启动了一个用于检查Session是否超时的checker。

此外,还启动了另外一个checker,如下:

// org.apache.kyuubi.engine.spark.SparkSQLEngine#start override def start(): Unit = { super.start() // Start engine self-terminating checker after all services are ready and it can be reached by // all servers in engine spaces. backendService.sessionManager.startTerminatingChecker() } // org.apache.kyuubi.session.SessionManager#startTerminatingChecker private[kyuubi] def startTerminatingChecker(): Unit = if (!isServer) { // initialize `_latestLogoutTime` at start _latestLogoutTime = System.currentTimeMillis() val interval = conf.get(ENGINE_CHECK_INTERVAL) val idleTimeout = conf.get(ENGINE_IDLE_TIMEOUT) val checkTask = new Runnable { override def run(): Unit = { if (!shutdown && System.currentTimeMillis() - latestLogoutTime > idleTimeout && getOpenSessionCount <= 0) { info(s"Idled for more than $idleTimeout ms, terminating") sys.exit(0) // Note:直接退出整个SparkSQLEngine,也就是App } } } timeoutChecker.scheduleWithFixedDelay(checkTask, interval, interval, TimeUnit.MILLISECONDS) } 实际上这个checker是在SparkSQLEngine递归初始化和启动其serviceList之前就已经启动,从它的实现当中我们可以看到,当超过一定时时并且SparkSQLEngine维护的Session为0时,整个SparkSQLEngine实例就会退出,这样做的好处就是,如果一个用户的SparkSQLEngine实例长期没有被使用,我们就可以将其占用的资源释放出来,达到节省资源的目的。

3.2.4 Kyuubi Session建立过程

Kyuubi Session的建立实际上包含两部分,分别是KyuubiServer Session建立和SparkSQLEngine Session建立,这两个过程不是独立进行的,KyuubiServer Session的建立伴随着SparkSQLEngine Session的建立,KyuubiServer Session和SparkSQLEngine Session才完整构成了Kyuubi中可用于执行特定Operation操作的Session。

KyuubiServer Session建立过程

当用户通过JDBC或beeline的方式连接Kyuubi时,实际上就开启了KyuubiServer Session的一个建立过程,此时KyuubiServer中FrontedService的OpenSession方法就会被执行:

// org.apache.kyuubi.service.FrontendService#OpenSession override def OpenSession(req: TOpenSessionReq): TOpenSessionResp = { debug(req.toString) info("Client protocol version: " + req.getClient_protocol) val resp = new TOpenSessionResp try { val sessionHandle = getSessionHandle(req, resp) resp.setSessionHandle(sessionHandle.toTSessionHandle) resp.setConfiguration(new java.util.HashMap[String, String]()) resp.setStatus(OK_STATUS) Option(CURRENT_SERVER_CONTEXT.get()).foreach(_.setSessionHandle(sessionHandle)) } catch { case e: Exception => warn("Error opening session: ", e) resp.setStatus(KyuubiSQLException.toTStatus(e, verbose = true)) } resp } 进而开启了KyuubiServer Session建立以及后续SparkSQLEngine实例启动(这部分前面已经单独介绍)、SparkSQLEngine Session建立的过程:

整体流程并不复杂,在执行FrontendService#OpenSession方法时,最终会调用到KyuubiSessionImpl#open方法,这是整个KyuubiServer Session建立最复杂也是最为关键的一个过程,为此我们单独将其流程整理出来进行说明:

流程中其实已经可以比较清晰地说明其过程,这里我们再详细展开说下,其主要分为下面的过程:

  • 服务发现与SparkSQLEngine实例启动
第一次建立特定user的session时,在zk的/kyuubi_USER path下是没有相关user的节点的,比如/kyuubi_USER/xpleaf,因此在代码执行流程中,其获取的值会为None,这就触发了其调用外部命令来启动一个SparkSQLEngine实例:

// org.apache.kyuubi.session.KyuubiSessionImpl#open override def open(): Unit = { super.open() val zkClient = startZookeeperClient(sessionConf) logSessionInfo(s"Connected to Zookeeper") try { getServerHost(zkClient, appZkNamespace) match { case Some((host, port)) => openSession(host, port) case None => sessionConf.setIfMissing(SparkProcessBuilder.APP_KEY, boundAppName.toString) // tag is a seq type with comma-separated sessionConf.set(SparkProcessBuilder.TAG_KEY, sessionConf.getOption(SparkProcessBuilder.TAG_KEY) .map(_ + ",").getOrElse("") + "KYUUBI") sessionConf.set(HA_ZK_NAMESPACE, appZkNamespace) val builder = new SparkProcessBuilder(appUser, sessionConf) try { logSessionInfo(s"Launching SQL engine:\n$builder") val process = builder.start var sh = getServerHost(zkClient, appZkNamespace) val started = System.currentTimeMillis() var exitValue: Option[Int] = None while (sh.isEmpty) { if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) { exitValue = Some(process.exitValue()) if (exitValue.get != 0) { throw builder.getError } } if (started + timeout <= System.currentTimeMillis()) { process.destroyForcibly() throw KyuubiSQLException(s"Timed out($timeout ms) to launched Spark with $builder", builder.getError) } sh = getServerHost(zkClient, appZkNamespace) } val Some((host, port)) = sh openSession(host, port) } finally { // we must close the process builder whether session open is success or failure since // we have a log capture thread in process builder. builder.close() } } } finally { try { zkClient.close() } catch { case e: IOException => error("Failed to release the zkClient after session established", e) } } } 而调用的外部命令实际上就是我们在前面讲解SparkSQLEngine实例中提到的spark-submit命令:

/Users/xpleaf/app/kyuubi-1.1.0-bin-spark-3.0-hadoop2.7/externals/spark-3.0.2-bin-hadoop2.7/bin/spark-submit \ --class org.apache.kyuubi.engine.spark.SparkSQLEngine \ --conf spark.app.name=kyuubi_USER_xpleaf_2dd0b8a8-e8c3-4788-8586-387622630b73 \ --conf spark.hive.server2.thrift.resultset.default.fetch.size=1000 \ --conf spark.kyuubi.ha.zookeeper.namespace=/kyuubi_USER/xpleaf \ --conf spark.kyuubi.ha.zookeeper.quorum=127.0.0.1:2181 \ --conf spark.yarn.tags=KYUUBI \ --conf spark.kyuubi.ha.zookeeper.acl.enabled=false \ --proxy-user xpleaf /Users/xpleaf/app/kyuubi-1.1.0-bin-spark-3.0-hadoop2.7/externals/engines/spark/kyuubi-spark-sql-engine-1.1.0.jar 之后就是SparkSQLEngine实例的启动过程,其启动完成之后,就会在Zookeeper上面注册自己的节点信息。

对于KyuubiSessionImpl#open方法,在不超时的情况下,循环会一直执行,直到其获取到用户的SparkSQLEngine实例信息,循环结束,进入下面跟SparkSQLEngine实例建立会话的过程。

  • 建立与SparkSQLEngine实例的会话
SparkSQLEngine本质上也是一个RPC服务端,为了与其进行通信以建立会话,就需要构建RPC客户端,这里KyuubiSessionImpl#openSession方法中构建RPC客户端的方法主要是Apache Thrift的一些模板代码,如下:

org.apache.kyuubi.session.KyuubiSessionImpl#openSession private def openSession(host: String, port: Int): Unit = { val passwd = Option(password).filter(_.nonEmpty).getOrElse("anonymous") val loginTimeout = sessionConf.get(ENGINE_LOGIN_TIMEOUT).toInt transport = PlainSASLHelper.getPlainTransport( user, passwd, new TSocket(host, port, loginTimeout)) if (!transport.isOpen) { logSessionInfo(s"Connecting to engine [$host:$port]") transport.open() logSessionInfo(s"Connected to engine [$host:$port]") } client = new TCLIService.Client(new TBinaryProtocol(transport)) val req = new TOpenSessionReq() req.setUsername(user) req.setPassword(passwd) req.setConfiguration(conf.asJava) logSessionInfo(s"Sending TOpenSessionReq to engine [$host:$port]") val resp = client.OpenSession(req) logSessionInfo(s"Received TOpenSessionResp from engine [$host:$port]") ThriftUtils.verifyTStatus(resp.getStatus) remoteSessionHandle = resp.getSessionHandle sessionManager.operationManager.setConnection(handle, client, remoteSessionHandle) } 在发送请求给SparkSQLEngine的时候,又会触发SparkSQLEngine Session建立的过程(这个接下来说明),在跟其建立完Session之后,KyuubiSessionImpl会将其用于标识用户端会话的sessionHandle、用于跟SparkSQLEngine进行通信的RPC客户端和在SparkSQLEngine实例中进行Session标识的remoteSessionHandle缓存下来,这样在整个Kyuubi体系中,就构建了一个完整的Session映射关系:userSessionInKyuubiServer-RPCClient-KyuubiServerSessionInSparkSQLEngine,后续的Operation都是建立在这样一个体系之下。

KyuubiServer在Session建立完成后会给客户端返回一个SessionHandle,后续客户端在与KyuubiServer进行通信时都会携带该SessionHandle,以标识其用于会话的窗口。

SparkSQLEngine Session建立过程

在接收到来自KyuubiServer的建立会话的RPC请求之后,SparkSQLEngine中FrontedService的OpenSession方法就会被执行,其整体流程与KyuubiServer Session的建立过程是类似的,主要不同在于SparkSQLSessionManager#openSession方法执行上面,如下:

其对应的关键代码如下:

// org.apache.kyuubi.engine.spark.session.SparkSQLSessionManager#openSession override def openSession( protocol: TProtocolVersion, user: String, password: String, ipAddress: String, conf: Map[String, String]): SessionHandle = { info(s"Opening session for $user@$ipAddress") val sessionImpl = new SparkSessionImpl(protocol, user, password, ipAddress, conf, this) val handle = sessionImpl.handle try { val sparkSession = spark.newSession() // 省略非核心代码 sessionImpl.open() operationManager.setSparkSession(handle, sparkSession) setSession(handle, sessionImpl) info(s"$user's session with $handle is opened, current opening sessions" + s" $getOpenSessionCount") handle } catch { case e: Exception => sessionImpl.close() throw KyuubiSQLException(e) } } sessionImpl.open()实际上只是做了日志记录的一些操作,所以其实这里的核心是将创建的Session记录下来。

SparkSQLEngine在Session建立完成后会给KyuubiServer返回一个SessionHandle,后续KyuubiServer在与SparkSQLEngine进行通信时都会携带该SessionHandle,以标识其用于会话的窗口。

3.2.5 Kyuubi SQL执行流程

Kyuubi SQL的执行流程实际上包含两部分,分别是KyuubiServer SQL执行流程和SparkSQLEngine SQL执行流程,其结合起来才是一个完整的SQL执行流程,KyuubiServer只是一个代理,真正的SQL执行是在SparkSQLEngine中完成。

另外由于在Kyuubi中,SQL的执行是异步的,也就是可以先提交一个SQL让其去执行,后续再通过其返回的operationHandle去获取结果,所以在KyuubiServer和SparkSQLEngine内部,SQL的执行流程又可以再细分为提交Statement和FetchResults两个过程,在分别分析KyuubiServer SQL执行流程和SparkSQLEngine SQL执行流程时,我们就是对提交Statment和FetchResults这两个过程来展开详细的分析,整体会有些繁多,但并不复杂。

KyuubiServer SQL执行流程

  • 1.提交Statement
当用户通过JDBC或beeline的方式执行一条SQL语句时,就开启了SQL语句在Kyuubi中的执行流程,此时KyuubiServer中FrontedService的ExecuteStatement方法就会被执行:

override def ExecuteStatement(req: TExecuteStatementReq): TExecuteStatementResp = { debug(req.toString) val resp = new TExecuteStatementResp try { val sessionHandle = SessionHandle(req.getSessionHandle) val statement = req.getStatement val runAsync = req.isRunAsync // val confOverlay = req.getConfOverlay val queryTimeout = req.getQueryTimeout val operationHandle = if (runAsync) { be.executeStatementAsync(sessionHandle, statement, queryTimeout) } else { be.executeStatement(sessionHandle, statement, queryTimeout) } resp.setOperationHandle(operationHandle.toTOperationHandle) resp.setStatus(OK_STATUS) } catch { case e: Exception => warn("Error executing statement: ", e) resp.setStatus(KyuubiSQLException.toTStatus(e)) } resp } runAsync值为true,因此会通过异步的方式来执行SQL,也就是会执行BackendService的executeStatementAsync方法,开启了异步执行SQL的流程:

首先会通过KyuubiOperationManager去创建一个表示执行SQL的ExecuteStatement:

// org.apache.kyuubi.operation.KyuubiOperationManager#newExecuteStatementOperation override def newExecuteStatementOperation( session: Session, statement: String, runAsync: Boolean, queryTimeout: Long): Operation = { val client = getThriftClient(session.handle) val remoteSessionHandle = getRemoteTSessionHandle(session.handle) val operation = new ExecuteStatement(session, client, remoteSessionHandle, statement, runAsync) addOperation(operation) } client实际上就是我们前面在KyuubiServer Session建立过程中建立的用于与SparkSQLEngine通信的RPC客户端,ExecuteStatement需要client来发送执行SQL语句的请求给SparkSQLEngine实例,不过需要注意的是,这里的ExecuteStatement是KyuubiServer体系下的,其类全路径为org.apache.kyuubi.operation.ExecuteStatement,因为后面在分析SparkSQLEngine SQL执行流程时,在SparkSQLEngine体系下也有一个ExecuteStatement,但其类全路径为org.apache.kyuubi.engine.spark.operation.ExecuteStatement

这里的整个流程关键在于后面执行operation.run()方法,进而执行runInternal()方法:

// org.apache.kyuubi.operation.ExecuteStatement#runInternal override protected def runInternal(): Unit = { if (shouldRunAsync) { executeStatement() val sessionManager = session.sessionManager val asyncOperation = new Runnable { override def run(): Unit = waitStatementComplete() } try { val backgroundOperation = sessionManager.submitBackgroundOperation(asyncOperation) setBackgroundHandle(backgroundOperation) } catch onError("submitting query in background, query rejected") } else { setState(OperationState.RUNNING) executeStatement() setState(OperationState.FINISHED) } } 这里会通过异步的方式来执行,其先同步执行executeStatement()方法,然后再提交一个异步线程来执行asyncOperation(sessionManager.submitBackgroundOperation(asyncOperation)实际上就是通过线程池来提交一个线程线程),我们先看一下其executeStatement()方法:

// org.apache.kyuubi.operation.ExecuteStatement#executeStatement private def executeStatement(): Unit = { try { val req = new TExecuteStatementReq(remoteSessionHandle, statement) req.setRunAsync(shouldRunAsync) val resp = client.ExecuteStatement(req) verifyTStatus(resp.getStatus) _remoteOpHandle = resp.getOperationHandle } catch onError() } 这里statement实际上就是要执行的SQL语句,所以本质上就是向SparkSQLEngine发送了一个用于执行SQL语句的RPC请求,这样就会触发SparkSQLEngine执行提交Statement的一个过程(这个接下来会分析),请求成功后,KyuubiServer会将SparkSQLEngine实例用于记录该操作的operationHandle记录下来,就是赋值给成员变量_remoteOpHandle_remoteOpHandle用后续用于查询statement在SparkSQLEngine实例中的执行状态和FetchResults。

执行完executeStatement()方法后,我们再看一下其提交异步线程时所执行的操作,也就是waitStatementComplete()方法:

// org.apache.kyuubi.operation.ExecuteStatement#waitStatementComplete // TODO 主要是更新该Operation的State为FINISHED,这样后面取数据时才知道已经执行完成 private lazy val statusReq = new TGetOperationStatusReq(_remoteOpHandle) private def waitStatementComplete(): Unit = { setState(OperationState.RUNNING) // 因为FetchResults有进行检查,assertState(OperationState.FINISHED) var statusResp = client.GetOperationStatus(statusReq) var isComplete = false while (!isComplete) { getQueryLog() verifyTStatus(statusResp.getStatus) val remoteState = statusResp.getOperationState info(s"Query[$statementId] in ${remoteState.name()}") isComplete = true remoteState match { case INITIALIZED_STATE | PENDING_STATE | RUNNING_STATE => isComplete = false statusResp = client.GetOperationStatus(statusReq) case FINISHED_STATE => setState(OperationState.FINISHED) // 省略其它代码 setOperationException(ke) } } // see if anymore log could be fetched getQueryLog() } 可以看到其主要操作是构建用于查询SparkSQLEngine实例中Operation的执行状态。

再回过来看一下runInternal()方法:

// org.apache.kyuubi.operation.ExecuteStatement#runInternal override protected def runInternal(): Unit = { if (shouldRunAsync) { executeStatement() val sessionManager = session.sessionManager val asyncOperation = new Runnable { override def run(): Unit = waitStatementComplete() } try { val backgroundOperation = sessionManager.submitBackgroundOperation(asyncOperation) setBackgroundHandle(backgroundOperation) } catch onError("submitting query in background, query rejected") } else { // 省略其它代码 } } 这里提交一个线程后的返回结果backgroundOperation实际上为一个FutureTask对象,后续在FetchResults过程中通过该对象就可以知道Operation在SparkSQLEngine实例中的执行状态。

在提交完Statement之后,KyuubiServer会将operationHandle返回给用户端,用于后续获取执行结果。

  • 2.FetchResults
提交完Statement后,用户层的RPC客户端就会去获取结果,此时KyuubiServer中FrontedService的FetchResults方法就会被执行:

// org.apache.kyuubi.service.FrontendService#FetchResults override def FetchResults(req: TFetchResultsReq): TFetchResultsResp = { debug(req.toString) val resp = new TFetchResultsResp try { val operationHandle = OperationHandle(req.getOperationHandle) val orientation = FetchOrientation.getFetchOrientation(req.getOrientation) // 1 means fetching log val fetchLog = req.getFetchType == 1 val maxRows = req.getMaxRows.toInt val rowSet = be.fetchResults(operationHandle, orientation, maxRows, fetchLog) resp.setResults(rowSet) resp.setHasMoreRows(false) resp.setStatus(OK_STATUS) } catch { case e: Exception => warn("Error fetching results: ", e) resp.setStatus(KyuubiSQLException.toTStatus(e)) } resp } 在获取真正执行结果之前,会有多次获取操作日志的请求,也就是req.getFetchType == 1的情况,这里我们只关注fetchLog为false的情况:

获取执行结果的过程就比较简单,主要是调用RPC客户端的FetchResults方法,这样就会触发SparkSQLEngine FetchResults的一个过程(这个接下来会分析),不过在获取执行结果前会检查其执行状态,前面在分析在提交Statement时,异步线程waitStatementComplete()就会请求SparkSQLEngine更新其状态为FINISHED,因此这里可以正常获取执行结果。

SparkSQLEngine SQL执行流程

  • 1.提交Statement
接收到KyuubiServer提交Statement的RPC请求时,此时SparkSQLEngine中FrontedService的ExecuteStatement方法就会被执行,进而触发接下来提交Statement的整个流程:

其整体流程与KyuubiServer是十分相似的,主要区别在于:

1.其创建的Statement为SparkSQLEngine体系下的ExecuteStatement;

2.其异步线程是通过SparkSession来执行SQL语句;

因此我们来看一下其runInternal()方法和异步线程执行的executeStatement()方法:

// org.apache.kyuubi.engine.spark.operation.ExecuteStatement#runInternal override protected def runInternal(): Unit = { if (shouldRunAsync) { val asyncOperation = new Runnable { override def run(): Unit = { OperationLog.setCurrentOperationLog(operationLog) executeStatement() } } try { val sparkSQLSessionManager = session.sessionManager val backgroundHandle = sparkSQLSessionManager.submitBackgroundOperation(asyncOperation) setBackgroundHandle(backgroundHandle) } catch { case rejected: RejectedExecutionException => setState(OperationState.ERROR) val ke = KyuubiSQLException("Error submitting query in background, query rejected", rejected) setOperationException(ke) throw ke } } else { executeStatement() } } // org.apache.kyuubi.engine.spark.operation.ExecuteStatement#executeStatement private def executeStatement(): Unit = { try { setState(OperationState.RUNNING) info(KyuubiSparkUtil.diagnostics(spark)) Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader) spark.sparkContext.setJobGroup(statementId, statement) result = spark.sql(statement) debug(result.queryExecution) iter = new ArrayFetchIterator(result.collect()) setState(OperationState.FINISHED) } catch { onError(cancel = true) } finally { spark.sparkContext.clearJobGroup() } } 可以看到其执行非常简单,就是直接调用SparkSession的sql()方法来执行SQL语句,最后再将结果保存到迭代器iter,并设置执行状态为完成。

在提交完Statement之后,SparkSQLEngine会将operationHandle返回给KyuubiServer,用于后续获取执行结果。

  • 2.FetchResults
接收到KyuubiServer获取结果的RPC请求时,此时SparkSQLEngine中FrontedService的FetchResults方法就会被执行,进而触发接下来FetchResults的整个流程:

整个过程比较简单,就是将iter的结果转换为rowSet的对象格式,最后返回给KyuubiServer。

参考资料

有关网易Spark Kyuubi核心架构设计与源码实现剖析的更多相关文章

  1. ruby-on-rails - Rails - 子类化模型的设计模式是什么? - 2

    我有一个模型:classItem项目有一个属性“商店”基于存储的值,我希望Item对象对特定方法具有不同的行为。Rails中是否有针对此的通用设计模式?如果方法中没有大的if-else语句,这是如何干净利落地完成的? 最佳答案 通常通过Single-TableInheritance. 关于ruby-on-rails-Rails-子类化模型的设计模式是什么?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.co

  2. ruby-on-rails - 使用 rails 4 设计而不更新用户 - 2

    我将应用程序升级到Rails4,一切正常。我可以登录并转到我的编辑页面。也更新了观点。使用标准View时,用户会更新。但是当我添加例如字段:name时,它​​不会在表单中更新。使用devise3.1.1和gem'protected_attributes'我需要在设备或数据库上运行某种更新命令吗?我也搜索过这个地方,找到了许多不同的解决方案,但没有一个会更新我的用户字段。我没有添加任何自定义字段。 最佳答案 如果您想允许额外的参数,您可以在ApplicationController中使用beforefilter,因为Rails4将参数

  3. ruby - 如何根据特征实现 FactoryGirl 的条件行为 - 2

    我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden

  4. 华为OD机试用Python实现 -【明明的随机数】 2023Q1A - 2

    华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o

  5. UE4 源码阅读:从引擎启动到Receive Begin Play - 2

    一、引擎主循环UE版本:4.27一、引擎主循环的位置:Launch.cpp:GuardedMain函数二、、GuardedMain函数执行逻辑:1、EnginePreInit:加载大多数模块int32ErrorLevel=EnginePreInit(CmdLine);PreInit模块加载顺序:模块加载过程:(1)注册模块中定义的UObject,同时为每个类构造一个类默认对象(CDO,记录类的默认状态,作为模板用于子类实例创建)(2)调用模块的StartUpModule方法2、FEngineLoop::Init()1、检查Engine的配置文件找出使用了哪一个GameEngine类(UGame

  6. 基于C#实现简易绘图工具【100010177】 - 2

    C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.

  7. LC滤波器设计学习笔记(一)滤波电路入门 - 2

    目录前言滤波电路科普主要分类实际情况单位的概念常用评价参数函数型滤波器简单分析滤波电路构成低通滤波器RC低通滤波器RL低通滤波器高通滤波器RC高通滤波器RL高通滤波器部分摘自《LC滤波器设计与制作》,侵权删。前言最近需要学习放大电路和滤波电路,但是由于只在之前做音乐频谱分析仪的时候简单了解过一点点运放,所以也是相当从零开始学习了。滤波电路科普主要分类滤波器:主要是从不同频率的成分中提取出特定频率的信号。有源滤波器:由RC元件与运算放大器组成的滤波器。可滤除某一次或多次谐波,最普通易于采用的无源滤波器结构是将电感与电容串联,可对主要次谐波(3、5、7)构成低阻抗旁路。无源滤波器:无源滤波器,又称

  8. MIMO-OFDM无线通信技术及MATLAB实现(1)无线信道:传播和衰落 - 2

     MIMO技术的优缺点优点通过下面三个增益来总体概括:阵列增益。阵列增益是指由于接收机通过对接收信号的相干合并而活得的平均SNR的提高。在发射机不知道信道信息的情况下,MIMO系统可以获得的阵列增益与接收天线数成正比复用增益。在采用空间复用方案的MIMO系统中,可以获得复用增益,即信道容量成倍增加。信道容量的增加与min(Nt,Nr)成正比分集增益。在采用空间分集方案的MIMO系统中,可以获得分集增益,即可靠性性能的改善。分集增益用独立衰落支路数来描述,即分集指数。在使用了空时编码的MIMO系统中,由于接收天线或发射天线之间的间距较远,可认为它们各自的大尺度衰落是相互独立的,因此分布式MIMO

  9. 计算机毕业设计ssm+vue基本微信小程序的小学生兴趣延时班预约小程序 - 2

    项目介绍随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱小学生兴趣延时班预约小程序的设计与开发被用户普遍使用,为方便用户能够可以随时进行小学生兴趣延时班预约小程序的设计与开发的数据信息管理,特开发了小程序的设计与开发的管理系统。小学生兴趣延时班预约小程序的设计与开发的开发利用现有的成熟技术参考,以源代码为模板,分析功能调整与小学生兴趣延时班预约小程序的设计与开发的实际需求相结合,讨论了小学生兴趣延时班预约小程序的设计与开发的使用。开发环境开发说明:前端使用微信微信小程序开发工具:后端使用ssm:VU

  10. 【Java入门】使用Java实现文件夹的遍历 - 2

    遍历文件夹我们通常是使用递归进行操作,这种方式比较简单,也比较容易理解。本文为大家介绍另一种不使用递归的方式,由于没有使用递归,只用到了循环和集合,所以效率更高一些!一、使用递归遍历文件夹整体思路1、使用File封装初始目录,2、打印这个目录3、获取这个目录下所有的子文件和子目录的数组。4、遍历这个数组,取出每个File对象4-1、如果File是否是一个文件,打印4-2、否则就是一个目录,递归调用代码实现publicclassSearchFile{publicstaticvoidmain(String[]args){//初始目录Filedir=newFile("d:/Dev");Datebeg

随机推荐