版权声明:本文为xpleaf(香飘叶子)博主原创文章,遵循CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。本文较为系统、全面并且由浅入深地介绍了网易Spark Kyuubi出现的背景、核心架构设计与关键源码实现,是学习、应用和对Kyuubi进行二次开发不可多得的技术干货,但由于作者认知水平有限,文中难免会出现描述不准确的措辞,还请多多包容和指出。
Apache Thrift是业界流行的RPC框架,通过其提供的接口描述语言(IDL),可以快速构建用于数据通信的并且语言无关的RPC客户端和服务端,在带来高性能的同时,大大降低了开发人员构建RPC服务的成本,因此在大数据生态其有较多的应用场景,比如我们熟知的hiveserver2即是基于Apache Thrift来构建其RPC服务。当用户通过JDBC或beeline方式执行一条SQL语句时,
TThreadPoolServer会接收到该SQL,通过一系列的Session和Operation的管理,最终会使用在启动Spark ThriftServer时已经构建好的SparkContext来执行该SQL,并获取最后的结果集。
从上面的基本分析中我们可以看到,在不考虑Spark ThrfitServer的底层RPC通信框架和业务细节时,其整体实现思路是比较清晰和简单的。
当然实际上要构建一个对外提供SQL能力的RPC服务时,是有许多细节需要考虑的,并且工作量也会非常巨大,Spark ThriftServer在实现时实际上也没有自己重复造轮子,它复用了hiveserver2的许多组件和逻辑,并根据自身的业务需求来对其进行特定改造;同样的,后面当我们去看Kyuubi时,也会发现它复用了hiveserver2和Spark ThriftServer的一些组件和逻辑,并在此基础上创新性地设计自己的一套架构。
这里列举的代码是基于Spark 2.1的源码,新版本在结构上可能有所有区别,但不影响我们对其本质实现原理的理解。前面提到的
TThreadPoolServer是Apache Thrift提供的用于构建RPC Server的一个工作线程池类,在Spark ThriftServer的Service体系结构中,ThriftBinaryService正是使用TThreadPoolServer来构建RPC服务端并对外提供一系列RPC服务接口:
Spark ThriftServer Service体系
ThriftBinaryService基于TThreadPoolServer构建RPC服务端
// org.apache.hive.service.cli.thrift.ThriftBinaryCLIService#run
public class ThriftBinaryCLIService extends ThriftCLIService {
@Override
public void run() {
// ...省略其它细节...
// TCP Server
server = new TThreadPoolServer(sargs);
server.setServerEventHandler(serverEventHandler);
server.serve();
// ...省略其它细节...
}
}
ThriftBinaryService提供的RPC服务接口
// org.apache.hive.service.cli.thrift.TCLIService.Iface
TOpenSessionResp OpenSession(TOpenSessionReq req);
TCloseSessionResp CloseSession(TCloseSessionReq req);
TGetInfoResp GetInfo(TGetInfoReq req);
TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req);
TGetTypeInfoResp GetTypeInfo(TGetTypeInfoReq req);
TGetCatalogsResp GetCatalogs(TGetCatalogsReq req);
TGetSchemasResp GetSchemas(TGetSchemasReq req);
TGetTablesResp GetTables(TGetTablesReq req);
TGetTableTypesResp GetTableTypes(TGetTableTypesReq req);
TGetColumnsResp GetColumns(TGetColumnsReq req);
TGetFunctionsResp GetFunctions(TGetFunctionsReq req);
TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req);
TCancelOperationResp CancelOperation(TCancelOperationReq req);
TCloseOperationResp CloseOperation(TCloseOperationReq req);
TGetResultSetMetadataResp GetResultSetMetadata(TGetResultSetMetadataReq req);
TFetchResultsResp FetchResults(TFetchResultsReq req);
TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq req);
TCancelDelegationTokenResp CancelDelegationToken(TCancelDelegationTokenReq req);
TRenewDelegationTokenResp RenewDelegationToken(TRenewDelegationTokenReq req);
可以看到,其提供的相当一部分接口都是提供SQL服务时所必要的能力。
当然,不管是使用标准的JDBC接口还是通过beeline的方式来访问Spark ThriftServer,必然都是通过Spark基于Apache Thrift构建的RPC客户端来访问这些RPC服务接口的,因此我们去看Spark ThriftServer提供的RPC客户端,其提供的方法接口与RPC服务端提供的是对应的,可以参考org.apache.hive.service.cli.thrift.TCLIService.Client。
如果比较难以理解,建议可以先研究一下RPC框架的本质,然后再简单使用一下Apache Thrift来构建RPC服务端和客户端,这样就会有一个比较清晰的理解,这里不对其底层框架和原理做更多深入的分析。个人觉得,要理解Spark ThriftServer,或是后面要介绍的Kyubbi,本质上是理解其通信框架,也就是其是怎么使用Apache Thrift来进行通信的,因为其它的细节都是业务实现。
在Spark ThriftServer执行的一条SQL实际上会被转换为一个job执行,如果用户A下发的SQL的job执行时间较长,必然也会阻塞后续用户B下发的SQL的执行。其次,单个Spark ThriftServer也容易带来单点故障问题。从Spark ThriftServer接受的客户端请求和其与Executor的通信来考虑,Spark ThriftServer本身的可靠性也难以满足生产环境下的需求。 因此,在将Spark ThriftServer应用于生产环境当中,上面提及的问题和局限性都会不可避免,那业界有没有比较好的解决方案呢?网易开源的Spark Kyuubi就给出了比较好的答案。
Kyuubi从整体上可以分为用户层、服务发现层、Kyuubi Server层、Kyuubi Engine层,其整体概述如下:
./beeline -u 'jdbc:hive2://10.2.10.1:10009' -n xpleaf
使用yyh用户名进行登录
./beeline -u 'jdbc:hive2://10.2.10.1:10010' -n yyh
使用leaf用户名进行登录
./beeline -u 'jdbc:hive2://10.2.10.2:10009' -n leaf
当然,这里的用户名或登录标识并不是可以随意指定或使用的,它应该根据实际使用场景由运维系统管理人员进行分配,并且其背后应当有一整套完整的认证、授权和审计机制,以确保整体系统的安全。
/kyuubi节点下面创建关于自己实例信息的节点,主要是包含KyuubiServer实例监听的host和port这两个关键信息,这样用户在连接KyuubiServer时,只需要到Zookeeper的/kyuubi节点下面获取对应的服务信息即可,当有多个KyuubiServer实例时,选取哪一个实例进行登录,这个是由用户自行决定的,Kyuubi本身并不会进行干预。
在实际应用时也可以封装接口实现随机返回实例给用户,以避免直接暴露Kyuubi的底层实现给用户。另外,KyuubiServer实例是对所有用户共享,并不会存在特定KyuubiServer实例只对特定用户服务的问题。
当然在实际应用时你也可以这么做,比如你可以不对用户暴露服务发现,也就是不对用户暴露Zookeeper,对于不同用户,直接告诉他们相应的KyuubiServer实例连接信息即可。不过这样一来,Kyuubi Server层的高可用就难以保证了。比如有多个在不同节点上启动的KyuubiServer实例,其在Zookeeper上面注册的信息如下:
/kyuubi/instance1_10.2.10.1:10009
/kyuubi/instance2_10.2.10.1:10010
/kyuubi/instance3_10.2.10.2:10009
Kyuubi Engine层的服务发现
Kyuubi Engine层的服务发现是不需要用户感知的,其属于Kyuubi内部不同组件之间的一种通信协作方式。
SparkSQLEngine实例在启动之后都会向Zookeeper的/kyuubi_USER节点下面创建关于自己实例信息的节点,主要是包含该实例监听的host和port以及其所属user的相关信息,也就是说SparkSQLEngine实例并不是所有用户共享的,它是由用户独享的。
比如Kyuubi系统中有多个不同用户使用了Kyuubi服务,启动了多个SparkSQLEngine实例,其在Zookeeper上面注册的信息如下:
/kyuubi_USER/xpleaf/instance1_10.2.20.1:52643
/kyuubi_USER/yyh/instance2_10.2.10.1:52346
/kyuubi_USER/leaf/instance3_10.2.10.2:51762
spark-submit提交一个Spark应用,而这个Spark应用本身就是SparkSQLEngine,启动后,基于其内部构建的SparkSession实例,即可为特定用户执行相关SQL操作。
xpleaf访问Kyuubi服务为例来描述整个流程。
10.2.10.1:10009、10.2.10.1:10010和10.2.10.2:1009;
./beeline -u 'jdbc:hive2://10.2.10.1:10009' -n xpleaf
在这里我们假设用户xpleaf事先已经通过管理员告知的方式知道了该KyuubiServer实例的连接信息。
10.2.10.1节点上进行启动,并且监听的52463端口,启动后,其向Zookeeper注册自己的连接信息/kyuubi_USER/xpleaf/instance1_10.2.10.1:52463;
0: jdbc:hive2://10.2.10.1:10009> select * from teacher;
0: jdbc:hive2://localhost:10009> select * from teacher;
+-----------+------------+--------------+
| database | tableName | isTemporary |
+-----------+------------+--------------+
| default | teacher | false |
+-----------+------------+--------------+
1 row selected (0.19 seconds)
确实没有办法在较为简短的篇幅里为大家介绍Kyuubi源码的方方面面,但我个人认为不管对于哪个大数据组件,在理解了其底层通信框架的基础上,再选取关于该组件的几个或多个关键场景来分析其源码,基本上对其整体设计就会有概览性的理解,这样后面对于该组件可能出现的Bug进行排查与修复,或是对该组件进行深度定制以满足业务的实际需求,我相信问题都不大——这也就达到了我们的目的,就是去解决实际问题。 当然,在这个过程当中你也可以欣赏到漂亮的代码,这本身也是一种享受。
如果想加深这方面的理解,可以参考我的一个开源RPC框架,其实就是非常mini版的Doubbo实现:https://github.com/xpleaf/minidubbo,建议有时间可以看下,实际上这会非常有用,因为几乎所有的大数据组件都会用到相关的RPC框架,不管是开源三方的还是其自己实现的(比如Hadoop的就是使用自己实现的一套RPC框架)。Apache Thrift Apache Thrift是业界流行的RPC框架,通过其提供的接口描述语言(IDL),可以快速构建用于数据通信的并且语言无关的RPC客户端和服务端,在带来高性能的同时,大大降低了开发人员构建RPC服务的成本,因此在大数据生态其有较多的应用场景,比如我们熟知的hiveserver2即是基于Apache Thrift来构建其RPC服务。
当然这些Service的实现类并不一定使用Service结尾,比如SessionManager、OperationManager等,但基本上从名字我们就能对其功能窥探一二。其完整的继承关系如下:
基于Kyuubi提供的核心功能,我们可以大致按Kyuubi Server层和Kyuubi Engine层来将整个体系中的Service类进行一个划分:
openSession、executeStatement、fetchResults等;openSession、executeStatement、fetchResults等;KinitAuxiliaryService是Kyuubi中用于认证的类,这里我们不对其认证功能实现进行说明。
通过对Service体系各个具体实现类的介绍,再回顾前面对Kyuubi整体架构和协作流程的介绍,其抽象的功能在源码实现类上面就有了一个相对比较清晰的体现,并且基本上也是可以一一对应上的。
Service组合关系
为了理解Kyuubi在源码层面上是如何进行整体协作的,除了前面介绍的Service体系外,我们还有必要理清其各个Service之间的组合关系。
在整个Service体系中,CompositeService这个中间抽象类在设计上是需要额外关注的,它表示的是在它之下的实现类都至少有一个成员为其它Service服务类对象,比如对于KyuubiServer,它的成员则包含有KyuubiBackdService、KyuubiServiceDiscovery等多个Service实现类,SparkSQLEngine也是如此。
我们将一些关键的Service类及其组合关系梳理如下,这对后面我们分析关键场景的代码执行流程时会提供很清晰的思路参考:
Session与SessionHandle
KyuubiSessionImpl,用来标识来自用户层的会话连接信息;Kyuubi Engine层的Session实现类为SparkSessionImpl,用来标识来自Kyuubi Server层的会话连接信息。两个Session实现类都有一个共同的抽象父类AbstractSession,用于Session操作的主要功能逻辑都是在该类实现的。
Executement,不过需要注意,Operation又分为Kyuubi Server层的KyuubiOperation和Kyuubi Engine层的SparkOperation。Kyuubi Server层的Operation并不会执行真正的操作,它只是一个代理,它会通过RPC Client请求Kyuubi Engine层来执行该Operation,因此所有Operation的真正执行都是在Kyuubi Engine层来完成的。
由于Operation都是建立在Session之下的,所以我们在看前面的组合关系时可以看到,用于管理Operation的OperationManager为SessionManager的成员属性。
第一次提交Operation时还是需要完整信息,后续只需要提供OperationHandle即可,实际上SQL语句的执行在Kyuubi内部是异步执行的,用户端在提交Opeation后即可获得OperationHandle,后续只需要持着该OperationHandle去获取结果即可,我们在分析SQL执行的代码时就可以看到这一点。
./kyuubi run命令去启动KyuubiServer时,就会去执行KyuubiServer的main方法:
def main(args: Array[String]): Unit = {
info(
"""
| Welcome to
| __ __ __
| /\ \/\ \ /\ \ __
| \ \ \/'/' __ __ __ __ __ __\ \ \____/\_\
| \ \ , < /\ \/\ \/\ \/\ \/\ \/\ \\ \ '__`\/\ \
| \ \ \\`\\ \ \_\ \ \ \_\ \ \ \_\ \\ \ \L\ \ \ \
| \ \_\ \_\/`____ \ \____/\ \____/ \ \_,__/\ \_\
| \/_/\/_/`/___/> \/___/ \/___/ \/___/ \/_/
| /\___/
| \/__/
""".stripMargin)
info(s"Version: $KYUUBI_VERSION, Revision: $REVISION, Branch: $BRANCH," +
s" Java: $JAVA_COMPILE_VERSION, Scala: $SCALA_COMPILE_VERSION," +
s" Spark: $SPARK_COMPILE_VERSION, Hadoop: $HADOOP_COMPILE_VERSION," +
s" Hive: $HIVE_COMPILE_VERSION")
info(s"Using Scala ${Properties.versionString}, ${Properties.javaVmName}," +
s" ${Properties.javaVersion}")
SignalRegister.registerLogger(logger)
val conf = new KyuubiConf().loadFileDefaults()
UserGroupInformation.setConfiguration(KyuubiHadoopUtils.newHadoopConf(conf))
startServer(conf)
}
在加载完配置信息后,通过调用startServer(conf)方法,就开始了KyuubiServer的启动流程:
def startServer(conf: KyuubiConf): KyuubiServer = {
if (!ServiceDiscovery.supportServiceDiscovery(conf)) {
zkServer.initialize(conf)
zkServer.start()
conf.set(HA_ZK_QUORUM, zkServer.getConnectString)
conf.set(HA_ZK_ACL_ENABLED, false)
}
val server = new KyuubiServer()
server.initialize(conf)
server.start()
sys.addShutdownHook(server.stop())
server
}
可以看到,实际上KyuubiServer的启动包括两部分:初始化和启动。
KyuubiServer的初始化和启动实际上是一个递归初始化和启动的过程。我们前面提到,KyuubiServer为Service体系下的一个CompositeService,参考前面给出的组合关系图,它本身的成员又包含了多个Service对象,它们都保存在保存在serviceList这个成员当中,因此初始化和启动KyuubiServer实际上就是初始化和启动serviceList中所包含的各个Service对象。而这些Service对象本身又可能是CompositeService,因此KyuubiServer的启动和初始化实际上就是一个递归初始化和启动的过程。
// 递归初始化serviceList下的各个服务
override def initialize(conf: KyuubiConf): Unit = {
serviceList.foreach(_.initialize(conf))
super.initialize(conf)
}
// 递归启动serviceList下的各个服务
override def start(): Unit = {
serviceList.zipWithIndex.foreach { case (service, idx) =>
try {
service.start()
} catch {
case NonFatal(e) =>
error(s"Error starting service ${service.getName}", e)
stop(idx)
throw new KyuubiException(s"Failed to Start $getName", e)
}
}
super.start()
}
这样一来,整个KyuubiServer的启动流程就比较清晰了,这也是我们在最开始就列出其Service体系和组合关系的原因,由于整体的启动流程和细节所包含的代码比较多,我们就没有必要贴代码了,这里我把整个初始化和启动流程步骤的流程图梳理了出来,待会再对其中一些需要重点关注的点进行说明,如下:
我们重点关注一下FontendService和ServiceDiscoveryService的初始化和启动流程。
FrontendService,因为KyuubiServer实例对外提供RPC服务都是由其作为入口来完成的。
其初始化时主要是获取和设置了Apache Thrift内置的用于构建RPC服务端的TThreadPoolServer的相关参数:
override def initialize(conf: KyuubiConf): Unit = synchronized {
this.conf = conf
try {
hadoopConf = KyuubiHadoopUtils.newHadoopConf(conf)
val serverHost = conf.get(FRONTEND_BIND_HOST)
serverAddr = serverHost.map(InetAddress.getByName).getOrElse(InetAddress.getLocalHost)
portNum = conf.get(FRONTEND_BIND_PORT)
val minThreads = conf.get(FRONTEND_MIN_WORKER_THREADS)
val maxThreads = conf.get(FRONTEND_MAX_WORKER_THREADS)
val keepAliveTime = conf.get(FRONTEND_WORKER_KEEPALIVE_TIME)
val executor = ExecutorPoolCaptureOom(
name + "Handler-Pool",
minThreads, maxThreads,
keepAliveTime,
oomHook)
authFactory = new KyuubiAuthenticationFactory(conf)
val transFactory = authFactory.getTTransportFactory
val tProcFactory = authFactory.getTProcessorFactory(this)
val serverSocket = new ServerSocket(portNum, -1, serverAddr)
portNum = serverSocket.getLocalPort
val tServerSocket = new TServerSocket(serverSocket)
val maxMessageSize = conf.get(FRONTEND_MAX_MESSAGE_SIZE)
val requestTimeout = conf.get(FRONTEND_LOGIN_TIMEOUT).toInt
val beBackoffSlotLength = conf.get(FRONTEND_LOGIN_BACKOFF_SLOT_LENGTH).toInt
val args = new TThreadPoolServer.Args(tServerSocket)
.processorFactory(tProcFactory)
.transportFactory(transFactory)
.protocolFactory(new TBinaryProtocol.Factory)
.inputProtocolFactory(
new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize))
.requestTimeout(requestTimeout).requestTimeoutUnit(TimeUnit.SECONDS)
.beBackoffSlotLength(beBackoffSlotLength)
.beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
.executorService(executor)
// TCP Server
server = Some(new TThreadPoolServer(args))
server.foreach(_.setServerEventHandler(new FeTServerEventHandler))
info(s"Initializing $name on host ${serverAddr.getCanonicalHostName} at port $portNum with" +
s" [$minThreads, $maxThreads] worker threads")
} catch {
case e: Throwable =>
throw new KyuubiException(
s"Failed to initialize frontend service on $serverAddr:$portNum.", e)
}
super.initialize(conf)
}
可以看到主要是host、port、minThreads、maxThreads、maxMessageSize、requestTimeout等,这些参数都是可配置的,关于其详细作用可以参考KyuubiConf这个类的说明。
其启动比较简单,主要是调用TThreadPoolServer的server()方法来完成:
override def start(): Unit = synchronized {
super.start()
if(!isStarted) {
serverThread = new NamedThreadFactory(getName, false).newThread(this)
serverThread.start()
isStarted = true
}
}
override def run(): Unit = try {
info(s"Starting and exposing JDBC connection at: jdbc:hive2://$connectionUrl/")
server.foreach(_.serve())
} catch {
case _: InterruptedException => error(s"$getName is interrupted")
case t: Throwable =>
error(s"Error starting $getName", t)
System.exit(-1)
}
def namespace: String = _namespace
override def initialize(conf: KyuubiConf): Unit = {
this.conf = conf
_namespace = conf.get(HA_ZK_NAMESPACE)
val maxSleepTime = conf.get(HA_ZK_CONN_MAX_RETRY_WAIT)
val maxRetries = conf.get(HA_ZK_CONN_MAX_RETRIES)
setUpZooKeeperAuth(conf)
_zkClient = buildZookeeperClient(conf)
zkClient.getConnectionStateListenable.addListener(new ConnectionStateListener {
private val isConnected = new AtomicBoolean(false)
override def stateChanged(client: CuratorFramework, newState: ConnectionState): Unit = {
info(s"Zookeeper client connection state changed to: $newState")
newState match {
case CONNECTED | RECONNECTED => isConnected.set(true)
case LOST =>
isConnected.set(false)
val delay = maxRetries.toLong * maxSleepTime
connectionChecker.schedule(new Runnable {
override def run(): Unit = if (!isConnected.get()) {
error(s"Zookeeper client connection state changed to: $newState, but failed to" +
s" reconnect in ${delay / 1000} seconds. Give up retry. ")
stopGracefully()
}
}, delay, TimeUnit.MILLISECONDS)
case _ =>
}
}
})
zkClient.start()
super.initialize(conf)
}
当然这里还看到其获取了一个HA_ZK_NAMESPACE的配置值,其默认值为kyuubi:
val HA_ZK_NAMESPACE: ConfigEntry[String] = buildConf("ha.zookeeper.namespace")
.doc("The root directory for the service to deploy its instance uri. Additionally, it will" +
" creates a -[username] suffixed root directory for each application")
.version("1.0.0")
.stringConf
.createWithDefault("kyuubi")
在ServiceDiscoveryService进行启动的时候,就会基于该namesapce来构建在Kyuubi Server层进行服务发现所需要的KyuubiServer实例信息:
override def start(): Unit = {
val ns = ZKPaths.makePath(null, namespace)
try {
zkClient
.create()
.creatingParentsIfNeeded()
.withMode(PERSISTENT)
.forPath(ns)
} catch {
case _: NodeExistsException => // do nothing
case e: KeeperException =>
throw new KyuubiException(s"Failed to create namespace '$ns'", e)
}
val instance = server.connectionUrl
val pathPrefix = ZKPaths.makePath(
namespace,
s"serviceUri=$instance;version=$KYUUBI_VERSION;sequence=")
try {
_serviceNode = new PersistentEphemeralNode(
zkClient,
PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL,
pathPrefix,
instance.getBytes(StandardCharsets.UTF_8))
serviceNode.start()
val znodeTimeout = 120
if (!serviceNode.waitForInitialCreate(znodeTimeout, TimeUnit.SECONDS)) {
throw new KyuubiException(s"Max znode creation wait time $znodeTimeout s exhausted")
}
info(s"Created a ${serviceNode.getActualPath} on ZooKeeper for KyuubiServer uri: " + instance)
} catch {
case e: Exception =>
if (serviceNode != null) {
serviceNode.close()
}
throw new KyuubiException(
s"Unable to create a znode for this server instance: $instance", e)
}
super.start()
}
在这里,就会在Zookeeper的/kyuubi节点下面创建一个包含KyuubiServer实例详细连接信息的节点,假设KyuubiServer实例所配置的host和post分别为10.2.10.1和10009,那么其所创建的zk节点为:
[zk: localhost:2181(CONNECTED) 87] ls /kyuubi
[serviceUri=10.2.10.1:10009;version=1.1.0;sequence=0000000007]
// org.apache.kyuubi.session.SessionManager#start
override def start(): Unit = {
startTimeoutChecker()
super.start()
}
// org.apache.kyuubi.session.SessionManager#startTimeoutChecker
private def startTimeoutChecker(): Unit = {
val interval = conf.get(SESSION_CHECK_INTERVAL)
val timeout = conf.get(SESSION_TIMEOUT)
val checkTask = new Runnable {
override def run(): Unit = {
val current = System.currentTimeMillis
if (!shutdown) {
for (session <- handleToSession.values().asScala) {
if (session.lastAccessTime + timeout <= current &&
session.getNoOperationTime > timeout) {
try {
closeSession(session.handle)
} catch {
case e: KyuubiSQLException =>
warn(s"Error closing idle session ${session.handle}", e)
}
} else {
session.closeExpiredOperations
}
}
}
}
}
timeoutChecker.scheduleWithFixedDelay(checkTask, interval, interval, TimeUnit.MILLISECONDS)
}
在这里主要完成的事情:
1.获取session check interval;
2.获取session timout;
3.起一个schedule的调度线程;
4.根据interval和timeout对handleToSession的session进行检查;
5.如果session超时(超过timeout没有access),则closesession;
那么对于KyuubiServer的启动过程我们就分析到这里,更多细节部分大家可以结合我的流程图来自行阅读代码即可,实际上当我们把Kyuubi的Service体系和组合关系整理下来之后,再去分析它的启动流程时就会发现简单很多,这个过程中无非就是要关注它的一些相关参数获取和设置是在哪里完成的,它是怎么侦听服务的(真正用于侦听host和port的server的启动)。
SparkSQLEngine启动流程
在KyuubiServer为用户建立会话时会去通过服务发现层去Zookeeper查找该用户是否存在对应的SparkSQLEngine实例,如果没有则通过spark-submit的启动一个属于该用户的SparkSQLEngine实例。
后面在分析KyuubiServer Session建立过程会提到,实际上KyuubiServer是通过调用外部进程命令的方式来提交一个Spark应用的,为了方便分析SparkSQLEngine的启动流程,这里我先将其大致的命令贴出来:
/Users/xpleaf/app/kyuubi-1.1.0-bin-spark-3.0-hadoop2.7/externals/spark-3.0.2-bin-hadoop2.7/bin/spark-submit \
--class org.apache.kyuubi.engine.spark.SparkSQLEngine \
--conf spark.app.name=kyuubi_USER_xpleaf_2dd0b8a8-e8c3-4788-8586-387622630b73 \
--conf spark.hive.server2.thrift.resultset.default.fetch.size=1000 \
--conf spark.kyuubi.ha.zookeeper.namespace=/kyuubi_USER/xpleaf \
--conf spark.kyuubi.ha.zookeeper.quorum=127.0.0.1:2181 \
--conf spark.yarn.tags=KYUUBI \
--conf spark.kyuubi.ha.zookeeper.acl.enabled=false \
--proxy-user xpleaf /Users/xpleaf/app/kyuubi-1.1.0-bin-spark-3.0-hadoop2.7/externals/engines/spark/kyuubi-spark-sql-engine-1.1.0.jar
kyuubi-spark-sql-engine-1.1.0.jar是Kyuubi发布版本里面的一个jar包,里面就包含了SparkSQLEngine这个类,通过-class参数我们可以知道,实际上就是要运行SparkSQLEngine的main方法,由于开启了SparkSQLEngine的启动流程。
需要说明的是,提交Sparkk App的这些参数在SparkSQLEngine启动之前都会被设置到SparkSQLEngine的成员变量kyuubiConf当中,获取方法比较简单,通过scala提供的sys.props就可以获取,这些参数在SparkSQLEngine的初始化和启动中都会起到十分关键的作用。
接下来我们看一下SparkSQLEngine的main方法:
def main(args: Array[String]): Unit = {
SignalRegister.registerLogger(logger)
var spark: SparkSession = null
var engine: SparkSQLEngine = null
try {
spark = createSpark()
engine = startEngine(spark)
info(KyuubiSparkUtil.diagnostics(spark))
// blocking main thread
countDownLatch.await()
} catch {
case t: Throwable =>
error("Error start SparkSQLEngine", t)
if (engine != null) {
engine.stop()
}
} finally {
if (spark != null) {
spark.stop()
}
}
}
首先会通过createSpark()创建一个SparkSession对象,后续SQL的真正执行都会交由其去执行,其创建方法如下:
def createSpark(): SparkSession = {
val sparkConf = new SparkConf()
sparkConf.setIfMissing("spark.sql.legacy.castComplexTypesToString.enabled", "true")
sparkConf.setIfMissing("spark.master", "local")
sparkConf.setIfMissing("spark.ui.port", "0")
val appName = s"kyuubi_${user}_spark_${Instant.now}"
sparkConf.setIfMissing("spark.app.name", appName)
kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_BIND_PORT, 0)
kyuubiConf.setIfMissing(HA_ZK_CONN_RETRY_POLICY, RetryPolicies.N_TIME.toString)
// Pass kyuubi config from spark with `spark.kyuubi`
val sparkToKyuubiPrefix = "spark.kyuubi."
sparkConf.getAllWithPrefix(sparkToKyuubiPrefix).foreach { case (k, v) =>
kyuubiConf.set(s"kyuubi.$k", v)
}
if (logger.isDebugEnabled) {
kyuubiConf.getAll.foreach { case (k, v) =>
debug(s"KyuubiConf: $k = $v")
}
}
val session = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
session.sql("SHOW DATABASES")
session
}
这里主要是设置了一些在创建SparkSession时需要的参数,包括appName、spark运行方式、spark ui的端口等。另外这里还特别对frontend.bind.port参数设置为0,关于该参数本身的定义如下:
val FRONTEND_BIND_PORT: ConfigEntry[Int] = buildConf("frontend.bind.port")
.doc("Port of the machine on which to run the frontend service.")
.version("1.0.0")
.intConf
.checkValue(p => p == 0 || (p > 1024 && p < 65535), "Invalid Port number")
.createWithDefault(10009)
可以看到其默认值为10009,前面KyuubiServer在构建TThreadPoolServer时就直接使用了默认值,这也是我们启动的KyuubiServer实例侦听10009端口的原因,而在这里,也就是SparkSQLEngine启动时将其设置为0是有原因,我们将在下面继续说明。
创建完成SparkSession后才调用startEngine(spark)方法启动SparkSQLEngine本身:
def startEngine(spark: SparkSession): SparkSQLEngine = {
val engine = new SparkSQLEngine(spark)
engine.initialize(kyuubiConf)
engine.start()
sys.addShutdownHook(engine.stop())
currentEngine = Some(engine)
engine
}
可以看到也是先进行初始化,然后再启动,SparkSQLEngine本身是CompositeService,所以初始化和启动过程跟KyuubiServer是一模一样的(当然其包含的成员会有所差别),都是递归对serviceList中所包含的各个Service对象进行初始化和启动:
frontend.bind.port参数的值为0,在FrontendService这个类当中,它会赋值给portNum这个变量,用以构建TThreadPoolServer所需要的参数ServerSocket对象:
// org.apache.kyuubi.service.FrontendService#initialize
val serverSocket = new ServerSocket(portNum, -1, serverAddr)
所以实际上,不管是KyuubiServer还是SparkSQLEngine,其所侦听的端口是在这里构建ServerSocket对象的时候确定下来的,对ServerSocket对象,如果传入一个为0的portNum,则表示使用系统随机分配的端口号,所以这也就是我们在启动了SparkSQLEngine之后看到其侦听的端口号都是随机端口号的原因。
--conf spark.kyuubi.ha.zookeeper.namespace=/kyuubi_USER/xpleaf的参数,实际上在SparkSQLEngine初始化KyuubiConfig对象时会设置到KyuubiConfig.HA_ZK_NAMESPACE属性上,因此在ServiceDiscoveryService初始化时获取的namespace实际上就为/kyuubi_USER/xpleaf,而不是默认的kyuubi,这点是需要注意的:
def namespace: String = _namespace
override def initialize(conf: KyuubiConf): Unit = {
this.conf = conf
_namespace = conf.get(HA_ZK_NAMESPACE)
// 省略其它代码
}
因此在启动调用start()方法时,其在Zookeeper上构建的znode节点也就不同:
override def start(): Unit = {
// 省略其它代码
val instance = server.connectionUrl
val pathPrefix = ZKPaths.makePath(
namespace,
s"serviceUri=$instance;version=$KYUUBI_VERSION;sequence=")
// 省略其它代码
}
比如其创建的znode节点为:
[zk: localhost:2181(CONNECTED) 94] ls /kyuubi_USER/xpleaf
[serviceUri=10.2.10.1:52643;version=1.1.0;sequence=0000000004]
// org.apache.kyuubi.engine.spark.SparkSQLEngine#start
override def start(): Unit = {
super.start()
// Start engine self-terminating checker after all services are ready and it can be reached by
// all servers in engine spaces.
backendService.sessionManager.startTerminatingChecker()
}
// org.apache.kyuubi.session.SessionManager#startTerminatingChecker
private[kyuubi] def startTerminatingChecker(): Unit = if (!isServer) {
// initialize `_latestLogoutTime` at start
_latestLogoutTime = System.currentTimeMillis()
val interval = conf.get(ENGINE_CHECK_INTERVAL)
val idleTimeout = conf.get(ENGINE_IDLE_TIMEOUT)
val checkTask = new Runnable {
override def run(): Unit = {
if (!shutdown &&
System.currentTimeMillis() - latestLogoutTime > idleTimeout && getOpenSessionCount <= 0) {
info(s"Idled for more than $idleTimeout ms, terminating")
sys.exit(0) // Note:直接退出整个SparkSQLEngine,也就是App
}
}
}
timeoutChecker.scheduleWithFixedDelay(checkTask, interval, interval, TimeUnit.MILLISECONDS)
}
实际上这个checker是在SparkSQLEngine递归初始化和启动其serviceList之前就已经启动,从它的实现当中我们可以看到,当超过一定时时并且SparkSQLEngine维护的Session为0时,整个SparkSQLEngine实例就会退出,这样做的好处就是,如果一个用户的SparkSQLEngine实例长期没有被使用,我们就可以将其占用的资源释放出来,达到节省资源的目的。
OpenSession方法就会被执行:
// org.apache.kyuubi.service.FrontendService#OpenSession
override def OpenSession(req: TOpenSessionReq): TOpenSessionResp = {
debug(req.toString)
info("Client protocol version: " + req.getClient_protocol)
val resp = new TOpenSessionResp
try {
val sessionHandle = getSessionHandle(req, resp)
resp.setSessionHandle(sessionHandle.toTSessionHandle)
resp.setConfiguration(new java.util.HashMap[String, String]())
resp.setStatus(OK_STATUS)
Option(CURRENT_SERVER_CONTEXT.get()).foreach(_.setSessionHandle(sessionHandle))
} catch {
case e: Exception =>
warn("Error opening session: ", e)
resp.setStatus(KyuubiSQLException.toTStatus(e, verbose = true))
}
resp
}
进而开启了KyuubiServer Session建立以及后续SparkSQLEngine实例启动(这部分前面已经单独介绍)、SparkSQLEngine Session建立的过程:
整体流程并不复杂,在执行FrontendService#OpenSession方法时,最终会调用到KyuubiSessionImpl#open方法,这是整个KyuubiServer Session建立最复杂也是最为关键的一个过程,为此我们单独将其流程整理出来进行说明:
流程中其实已经可以比较清晰地说明其过程,这里我们再详细展开说下,其主要分为下面的过程:
// org.apache.kyuubi.session.KyuubiSessionImpl#open
override def open(): Unit = {
super.open()
val zkClient = startZookeeperClient(sessionConf)
logSessionInfo(s"Connected to Zookeeper")
try {
getServerHost(zkClient, appZkNamespace) match {
case Some((host, port)) => openSession(host, port)
case None =>
sessionConf.setIfMissing(SparkProcessBuilder.APP_KEY, boundAppName.toString)
// tag is a seq type with comma-separated
sessionConf.set(SparkProcessBuilder.TAG_KEY,
sessionConf.getOption(SparkProcessBuilder.TAG_KEY)
.map(_ + ",").getOrElse("") + "KYUUBI")
sessionConf.set(HA_ZK_NAMESPACE, appZkNamespace)
val builder = new SparkProcessBuilder(appUser, sessionConf)
try {
logSessionInfo(s"Launching SQL engine:\n$builder")
val process = builder.start
var sh = getServerHost(zkClient, appZkNamespace)
val started = System.currentTimeMillis()
var exitValue: Option[Int] = None
while (sh.isEmpty) {
if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) {
exitValue = Some(process.exitValue())
if (exitValue.get != 0) {
throw builder.getError
}
}
if (started + timeout <= System.currentTimeMillis()) {
process.destroyForcibly()
throw KyuubiSQLException(s"Timed out($timeout ms) to launched Spark with $builder",
builder.getError)
}
sh = getServerHost(zkClient, appZkNamespace)
}
val Some((host, port)) = sh
openSession(host, port)
} finally {
// we must close the process builder whether session open is success or failure since
// we have a log capture thread in process builder.
builder.close()
}
}
} finally {
try {
zkClient.close()
} catch {
case e: IOException => error("Failed to release the zkClient after session established", e)
}
}
}
而调用的外部命令实际上就是我们在前面讲解SparkSQLEngine实例中提到的spark-submit命令:
/Users/xpleaf/app/kyuubi-1.1.0-bin-spark-3.0-hadoop2.7/externals/spark-3.0.2-bin-hadoop2.7/bin/spark-submit \
--class org.apache.kyuubi.engine.spark.SparkSQLEngine \
--conf spark.app.name=kyuubi_USER_xpleaf_2dd0b8a8-e8c3-4788-8586-387622630b73 \
--conf spark.hive.server2.thrift.resultset.default.fetch.size=1000 \
--conf spark.kyuubi.ha.zookeeper.namespace=/kyuubi_USER/xpleaf \
--conf spark.kyuubi.ha.zookeeper.quorum=127.0.0.1:2181 \
--conf spark.yarn.tags=KYUUBI \
--conf spark.kyuubi.ha.zookeeper.acl.enabled=false \
--proxy-user xpleaf /Users/xpleaf/app/kyuubi-1.1.0-bin-spark-3.0-hadoop2.7/externals/engines/spark/kyuubi-spark-sql-engine-1.1.0.jar
之后就是SparkSQLEngine实例的启动过程,其启动完成之后,就会在Zookeeper上面注册自己的节点信息。
对于KyuubiSessionImpl#open方法,在不超时的情况下,循环会一直执行,直到其获取到用户的SparkSQLEngine实例信息,循环结束,进入下面跟SparkSQLEngine实例建立会话的过程。
org.apache.kyuubi.session.KyuubiSessionImpl#openSession
private def openSession(host: String, port: Int): Unit = {
val passwd = Option(password).filter(_.nonEmpty).getOrElse("anonymous")
val loginTimeout = sessionConf.get(ENGINE_LOGIN_TIMEOUT).toInt
transport = PlainSASLHelper.getPlainTransport(
user, passwd, new TSocket(host, port, loginTimeout))
if (!transport.isOpen) {
logSessionInfo(s"Connecting to engine [$host:$port]")
transport.open()
logSessionInfo(s"Connected to engine [$host:$port]")
}
client = new TCLIService.Client(new TBinaryProtocol(transport))
val req = new TOpenSessionReq()
req.setUsername(user)
req.setPassword(passwd)
req.setConfiguration(conf.asJava)
logSessionInfo(s"Sending TOpenSessionReq to engine [$host:$port]")
val resp = client.OpenSession(req)
logSessionInfo(s"Received TOpenSessionResp from engine [$host:$port]")
ThriftUtils.verifyTStatus(resp.getStatus)
remoteSessionHandle = resp.getSessionHandle
sessionManager.operationManager.setConnection(handle, client, remoteSessionHandle)
}
在发送请求给SparkSQLEngine的时候,又会触发SparkSQLEngine Session建立的过程(这个接下来说明),在跟其建立完Session之后,KyuubiSessionImpl会将其用于标识用户端会话的sessionHandle、用于跟SparkSQLEngine进行通信的RPC客户端和在SparkSQLEngine实例中进行Session标识的remoteSessionHandle缓存下来,这样在整个Kyuubi体系中,就构建了一个完整的Session映射关系:userSessionInKyuubiServer-RPCClient-KyuubiServerSessionInSparkSQLEngine,后续的Operation都是建立在这样一个体系之下。
KyuubiServer在Session建立完成后会给客户端返回一个SessionHandle,后续客户端在与KyuubiServer进行通信时都会携带该SessionHandle,以标识其用于会话的窗口。
SparkSQLEngine Session建立过程
在接收到来自KyuubiServer的建立会话的RPC请求之后,SparkSQLEngine中FrontedService的OpenSession方法就会被执行,其整体流程与KyuubiServer Session的建立过程是类似的,主要不同在于SparkSQLSessionManager#openSession方法执行上面,如下:
其对应的关键代码如下:
// org.apache.kyuubi.engine.spark.session.SparkSQLSessionManager#openSession
override def openSession(
protocol: TProtocolVersion,
user: String,
password: String,
ipAddress: String,
conf: Map[String, String]): SessionHandle = {
info(s"Opening session for $user@$ipAddress")
val sessionImpl = new SparkSessionImpl(protocol, user, password, ipAddress, conf, this)
val handle = sessionImpl.handle
try {
val sparkSession = spark.newSession()
// 省略非核心代码
sessionImpl.open()
operationManager.setSparkSession(handle, sparkSession)
setSession(handle, sessionImpl)
info(s"$user's session with $handle is opened, current opening sessions" +
s" $getOpenSessionCount")
handle
} catch {
case e: Exception =>
sessionImpl.close()
throw KyuubiSQLException(e)
}
}
sessionImpl.open()实际上只是做了日志记录的一些操作,所以其实这里的核心是将创建的Session记录下来。
SparkSQLEngine在Session建立完成后会给KyuubiServer返回一个SessionHandle,后续KyuubiServer在与SparkSQLEngine进行通信时都会携带该SessionHandle,以标识其用于会话的窗口。
ExecuteStatement方法就会被执行:
override def ExecuteStatement(req: TExecuteStatementReq): TExecuteStatementResp = {
debug(req.toString)
val resp = new TExecuteStatementResp
try {
val sessionHandle = SessionHandle(req.getSessionHandle)
val statement = req.getStatement
val runAsync = req.isRunAsync
// val confOverlay = req.getConfOverlay
val queryTimeout = req.getQueryTimeout
val operationHandle = if (runAsync) {
be.executeStatementAsync(sessionHandle, statement, queryTimeout)
} else {
be.executeStatement(sessionHandle, statement, queryTimeout)
}
resp.setOperationHandle(operationHandle.toTOperationHandle)
resp.setStatus(OK_STATUS)
} catch {
case e: Exception =>
warn("Error executing statement: ", e)
resp.setStatus(KyuubiSQLException.toTStatus(e))
}
resp
}
runAsync值为true,因此会通过异步的方式来执行SQL,也就是会执行BackendService的executeStatementAsync方法,开启了异步执行SQL的流程:
首先会通过KyuubiOperationManager去创建一个表示执行SQL的ExecuteStatement:
// org.apache.kyuubi.operation.KyuubiOperationManager#newExecuteStatementOperation
override def newExecuteStatementOperation(
session: Session,
statement: String,
runAsync: Boolean,
queryTimeout: Long): Operation = {
val client = getThriftClient(session.handle)
val remoteSessionHandle = getRemoteTSessionHandle(session.handle)
val operation = new ExecuteStatement(session, client, remoteSessionHandle, statement, runAsync)
addOperation(operation)
}
client实际上就是我们前面在KyuubiServer Session建立过程中建立的用于与SparkSQLEngine通信的RPC客户端,ExecuteStatement需要client来发送执行SQL语句的请求给SparkSQLEngine实例,不过需要注意的是,这里的ExecuteStatement是KyuubiServer体系下的,其类全路径为org.apache.kyuubi.operation.ExecuteStatement,因为后面在分析SparkSQLEngine SQL执行流程时,在SparkSQLEngine体系下也有一个ExecuteStatement,但其类全路径为org.apache.kyuubi.engine.spark.operation.ExecuteStatement。
这里的整个流程关键在于后面执行operation.run()方法,进而执行runInternal()方法:
// org.apache.kyuubi.operation.ExecuteStatement#runInternal
override protected def runInternal(): Unit = {
if (shouldRunAsync) {
executeStatement()
val sessionManager = session.sessionManager
val asyncOperation = new Runnable {
override def run(): Unit = waitStatementComplete()
}
try {
val backgroundOperation =
sessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(backgroundOperation)
} catch onError("submitting query in background, query rejected")
} else {
setState(OperationState.RUNNING)
executeStatement()
setState(OperationState.FINISHED)
}
}
这里会通过异步的方式来执行,其先同步执行executeStatement()方法,然后再提交一个异步线程来执行asyncOperation(sessionManager.submitBackgroundOperation(asyncOperation)实际上就是通过线程池来提交一个线程线程),我们先看一下其executeStatement()方法:
// org.apache.kyuubi.operation.ExecuteStatement#executeStatement
private def executeStatement(): Unit = {
try {
val req = new TExecuteStatementReq(remoteSessionHandle, statement)
req.setRunAsync(shouldRunAsync)
val resp = client.ExecuteStatement(req)
verifyTStatus(resp.getStatus)
_remoteOpHandle = resp.getOperationHandle
} catch onError()
}
这里statement实际上就是要执行的SQL语句,所以本质上就是向SparkSQLEngine发送了一个用于执行SQL语句的RPC请求,这样就会触发SparkSQLEngine执行提交Statement的一个过程(这个接下来会分析),请求成功后,KyuubiServer会将SparkSQLEngine实例用于记录该操作的operationHandle记录下来,就是赋值给成员变量_remoteOpHandle,_remoteOpHandle用后续用于查询statement在SparkSQLEngine实例中的执行状态和FetchResults。
执行完executeStatement()方法后,我们再看一下其提交异步线程时所执行的操作,也就是waitStatementComplete()方法:
// org.apache.kyuubi.operation.ExecuteStatement#waitStatementComplete
// TODO 主要是更新该Operation的State为FINISHED,这样后面取数据时才知道已经执行完成
private lazy val statusReq = new TGetOperationStatusReq(_remoteOpHandle)
private def waitStatementComplete(): Unit = {
setState(OperationState.RUNNING) // 因为FetchResults有进行检查,assertState(OperationState.FINISHED)
var statusResp = client.GetOperationStatus(statusReq)
var isComplete = false
while (!isComplete) {
getQueryLog()
verifyTStatus(statusResp.getStatus)
val remoteState = statusResp.getOperationState
info(s"Query[$statementId] in ${remoteState.name()}")
isComplete = true
remoteState match {
case INITIALIZED_STATE | PENDING_STATE | RUNNING_STATE =>
isComplete = false
statusResp = client.GetOperationStatus(statusReq)
case FINISHED_STATE =>
setState(OperationState.FINISHED)
// 省略其它代码
setOperationException(ke)
}
}
// see if anymore log could be fetched
getQueryLog()
}
可以看到其主要操作是构建用于查询SparkSQLEngine实例中Operation的执行状态。
再回过来看一下runInternal()方法:
// org.apache.kyuubi.operation.ExecuteStatement#runInternal
override protected def runInternal(): Unit = {
if (shouldRunAsync) {
executeStatement()
val sessionManager = session.sessionManager
val asyncOperation = new Runnable {
override def run(): Unit = waitStatementComplete()
}
try {
val backgroundOperation =
sessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(backgroundOperation)
} catch onError("submitting query in background, query rejected")
} else {
// 省略其它代码
}
}
这里提交一个线程后的返回结果backgroundOperation实际上为一个FutureTask对象,后续在FetchResults过程中通过该对象就可以知道Operation在SparkSQLEngine实例中的执行状态。
在提交完Statement之后,KyuubiServer会将operationHandle返回给用户端,用于后续获取执行结果。
FetchResults方法就会被执行:
// org.apache.kyuubi.service.FrontendService#FetchResults
override def FetchResults(req: TFetchResultsReq): TFetchResultsResp = {
debug(req.toString)
val resp = new TFetchResultsResp
try {
val operationHandle = OperationHandle(req.getOperationHandle)
val orientation = FetchOrientation.getFetchOrientation(req.getOrientation)
// 1 means fetching log
val fetchLog = req.getFetchType == 1
val maxRows = req.getMaxRows.toInt
val rowSet = be.fetchResults(operationHandle, orientation, maxRows, fetchLog)
resp.setResults(rowSet)
resp.setHasMoreRows(false)
resp.setStatus(OK_STATUS)
} catch {
case e: Exception =>
warn("Error fetching results: ", e)
resp.setStatus(KyuubiSQLException.toTStatus(e))
}
resp
}
在获取真正执行结果之前,会有多次获取操作日志的请求,也就是req.getFetchType == 1的情况,这里我们只关注fetchLog为false的情况:
获取执行结果的过程就比较简单,主要是调用RPC客户端的FetchResults方法,这样就会触发SparkSQLEngine FetchResults的一个过程(这个接下来会分析),不过在获取执行结果前会检查其执行状态,前面在分析在提交Statement时,异步线程waitStatementComplete()就会请求SparkSQLEngine更新其状态为FINISHED,因此这里可以正常获取执行结果。
SparkSQLEngine SQL执行流程
ExecuteStatement方法就会被执行,进而触发接下来提交Statement的整个流程:
其整体流程与KyuubiServer是十分相似的,主要区别在于:
1.其创建的Statement为SparkSQLEngine体系下的ExecuteStatement;
2.其异步线程是通过SparkSession来执行SQL语句;
因此我们来看一下其runInternal()方法和异步线程执行的executeStatement()方法:
// org.apache.kyuubi.engine.spark.operation.ExecuteStatement#runInternal
override protected def runInternal(): Unit = {
if (shouldRunAsync) {
val asyncOperation = new Runnable {
override def run(): Unit = {
OperationLog.setCurrentOperationLog(operationLog)
executeStatement()
}
}
try {
val sparkSQLSessionManager = session.sessionManager
val backgroundHandle = sparkSQLSessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
setState(OperationState.ERROR)
val ke = KyuubiSQLException("Error submitting query in background, query rejected",
rejected)
setOperationException(ke)
throw ke
}
} else {
executeStatement()
}
}
// org.apache.kyuubi.engine.spark.operation.ExecuteStatement#executeStatement
private def executeStatement(): Unit = {
try {
setState(OperationState.RUNNING)
info(KyuubiSparkUtil.diagnostics(spark))
Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader)
spark.sparkContext.setJobGroup(statementId, statement)
result = spark.sql(statement)
debug(result.queryExecution)
iter = new ArrayFetchIterator(result.collect())
setState(OperationState.FINISHED)
} catch {
onError(cancel = true)
} finally {
spark.sparkContext.clearJobGroup()
}
}
可以看到其执行非常简单,就是直接调用SparkSession的sql()方法来执行SQL语句,最后再将结果保存到迭代器iter,并设置执行状态为完成。
在提交完Statement之后,SparkSQLEngine会将operationHandle返回给KyuubiServer,用于后续获取执行结果。
FetchResults方法就会被执行,进而触发接下来FetchResults的整个流程:
整个过程比较简单,就是将iter的结果转换为rowSet的对象格式,最后返回给KyuubiServer。
我有一个模型:classItem项目有一个属性“商店”基于存储的值,我希望Item对象对特定方法具有不同的行为。Rails中是否有针对此的通用设计模式?如果方法中没有大的if-else语句,这是如何干净利落地完成的? 最佳答案 通常通过Single-TableInheritance. 关于ruby-on-rails-Rails-子类化模型的设计模式是什么?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.co
我将应用程序升级到Rails4,一切正常。我可以登录并转到我的编辑页面。也更新了观点。使用标准View时,用户会更新。但是当我添加例如字段:name时,它不会在表单中更新。使用devise3.1.1和gem'protected_attributes'我需要在设备或数据库上运行某种更新命令吗?我也搜索过这个地方,找到了许多不同的解决方案,但没有一个会更新我的用户字段。我没有添加任何自定义字段。 最佳答案 如果您想允许额外的参数,您可以在ApplicationController中使用beforefilter,因为Rails4将参数
我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden
华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o
一、引擎主循环UE版本:4.27一、引擎主循环的位置:Launch.cpp:GuardedMain函数二、、GuardedMain函数执行逻辑:1、EnginePreInit:加载大多数模块int32ErrorLevel=EnginePreInit(CmdLine);PreInit模块加载顺序:模块加载过程:(1)注册模块中定义的UObject,同时为每个类构造一个类默认对象(CDO,记录类的默认状态,作为模板用于子类实例创建)(2)调用模块的StartUpModule方法2、FEngineLoop::Init()1、检查Engine的配置文件找出使用了哪一个GameEngine类(UGame
C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.
目录前言滤波电路科普主要分类实际情况单位的概念常用评价参数函数型滤波器简单分析滤波电路构成低通滤波器RC低通滤波器RL低通滤波器高通滤波器RC高通滤波器RL高通滤波器部分摘自《LC滤波器设计与制作》,侵权删。前言最近需要学习放大电路和滤波电路,但是由于只在之前做音乐频谱分析仪的时候简单了解过一点点运放,所以也是相当从零开始学习了。滤波电路科普主要分类滤波器:主要是从不同频率的成分中提取出特定频率的信号。有源滤波器:由RC元件与运算放大器组成的滤波器。可滤除某一次或多次谐波,最普通易于采用的无源滤波器结构是将电感与电容串联,可对主要次谐波(3、5、7)构成低阻抗旁路。无源滤波器:无源滤波器,又称
MIMO技术的优缺点优点通过下面三个增益来总体概括:阵列增益。阵列增益是指由于接收机通过对接收信号的相干合并而活得的平均SNR的提高。在发射机不知道信道信息的情况下,MIMO系统可以获得的阵列增益与接收天线数成正比复用增益。在采用空间复用方案的MIMO系统中,可以获得复用增益,即信道容量成倍增加。信道容量的增加与min(Nt,Nr)成正比分集增益。在采用空间分集方案的MIMO系统中,可以获得分集增益,即可靠性性能的改善。分集增益用独立衰落支路数来描述,即分集指数。在使用了空时编码的MIMO系统中,由于接收天线或发射天线之间的间距较远,可认为它们各自的大尺度衰落是相互独立的,因此分布式MIMO
项目介绍随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱小学生兴趣延时班预约小程序的设计与开发被用户普遍使用,为方便用户能够可以随时进行小学生兴趣延时班预约小程序的设计与开发的数据信息管理,特开发了小程序的设计与开发的管理系统。小学生兴趣延时班预约小程序的设计与开发的开发利用现有的成熟技术参考,以源代码为模板,分析功能调整与小学生兴趣延时班预约小程序的设计与开发的实际需求相结合,讨论了小学生兴趣延时班预约小程序的设计与开发的使用。开发环境开发说明:前端使用微信微信小程序开发工具:后端使用ssm:VU
遍历文件夹我们通常是使用递归进行操作,这种方式比较简单,也比较容易理解。本文为大家介绍另一种不使用递归的方式,由于没有使用递归,只用到了循环和集合,所以效率更高一些!一、使用递归遍历文件夹整体思路1、使用File封装初始目录,2、打印这个目录3、获取这个目录下所有的子文件和子目录的数组。4、遍历这个数组,取出每个File对象4-1、如果File是否是一个文件,打印4-2、否则就是一个目录,递归调用代码实现publicclassSearchFile{publicstaticvoidmain(String[]args){//初始目录Filedir=newFile("d:/Dev");Datebeg