本文对 Flink 的 Application、Per-Job 和 Session 部署模式进行了对比分析。详细介绍了 Native Kubernetes 场景下的 Application 部署模式,并且对整个启动流程进行了源码分析。
Flink 的部署模式有 Application、Per-Job 和 Session 模式。
Application、Per-Job 和 Session 部署模式的主要区别:
● 集群与作业的生命周期是否一致
● 资源的隔离程度
● 作业的mian()运行在 client 还是集群上
Application 模式的特点:① 作业与 Flink 集群打包在一起,在 JobManager 的启动时候会执行作业的 main 函数直接启动作业,而不需要通过 Flink Client 提交作业。② 作业的生命周期与 Flink 集群的一致,即作业关闭后 Flink 集群也会关闭
说明:Application 模式对比 Per-Job 模式最大的区别是前者使用
executeAsync()提交作业(不阻塞),而后者使用execute()提交作业(阻塞),因此 Application 模式可以运行多个作业
Per-Job 模式的特点:作业与 Flink 集群不是打包在一起,在 JobManager 启动后需要通过 Flink Client 提交作业,即增加了网络传输的压力和客户端的 CPU 资源。
Session 模式的特点:常驻的 JobManager,多个作业共享同一个集群。如果其中一个作业异常导致 TaskManager 关闭,则该 TM 上的全部作业都会重新调度。
资源调度方面:Flink 支持 Kubernetes、Yarn 和 Mesos 资源调度器
Native 是指可以通过底层的资源调度管理器,实现弹性扩缩容。Native Kubernetes Application 是指 Flink 采用 Application 的部署模式,并使用 Kubernetes 进行资源管理。
用户只需要通过 Flink Client/CLI 启动作业。首先通过 K8s 启动 JobManager(deployment)的同时启动作业,然后通过 JobManager 内部的 K8sResourceManager 模块向 K8s 直接申请 TaskManager 的资源并启动,最后当 TM 注册到 JM 后作业就提交到 TM。用户在整个过程无需指定 TaskManager 资源的数量,而是由 JobManager 向 K8s 按需申请的。

Flink Application on Native Kubernetes 的实践案例:
《Flink on K8s 在阿里巴巴的实践》
《Native Flink on K8s 在小红书的实践》
《Flink on K8s 在京东的持续优化实践》

$ ./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=top-speed-windowing-application \
-Dkubernetes.container.image=172.1.45.167:5000/flink:1.13.6-scala_2.11 \
local:///opt/flink/examples/streaming/TopSpeedWindowing.jar
Native Kubernetes Application 模式下,启动脚本 ./bin/flink 的必要参数有 --target kubernetes-application、-Dkubernetes.cluster-id=***、-Dkubernetes.container.image=*** 和 作业 jar 路径 local:///***
public int parseAndRun(String[] args) {
// 省略...
try {
// do action
switch (action) {
case ACTION_RUN:
run(params);
return 0;
// 匹配参数 run-application
case ACTION_RUN_APPLICATION:
runApplication(params);
return 0;
case ACTION_LIST:
list(params);
return 0;
// 省略...
}
protected void runApplication(String[] args) throws Exception {
// 省略...
// 创建 ApplicationDeployer 用于创建 Kubernetes ClusterDescriptor
final ApplicationDeployer deployer =
new ApplicationClusterDeployer(clusterClientServiceLoader);
if (ProgramOptionsUtils.isPythonEntryPoint(commandLine)) {
programOptions = ProgramOptionsUtils.createPythonProgramOptions(commandLine);
effectiveConfiguration =
getEffectiveConfiguration(
activeCommandLine,
commandLine,
programOptions,
Collections.emptyList());
} else {
// 作业参数,例如 jar 路径、main 函数入口、args 入参等等
programOptions = new ProgramOptions(commandLine);
programOptions.validate();
final URI uri = PackagedProgramUtils.resolveURI(programOptions.getJarFilePath());
effectiveConfiguration =
getEffectiveConfiguration(
activeCommandLine,
commandLine,
programOptions,
Collections.singletonList(uri.toString()));
}
final ApplicationConfiguration applicationConfiguration =
new ApplicationConfiguration(
programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
// 提交用户的作业并在集群中运行其 main 函数
deployer.run(effectiveConfiguration, applicationConfiguration);
}
public class ApplicationClusterDeployer implements ApplicationDeployer {
// 省略...
public <ClusterID> void run(
final Configuration configuration,
final ApplicationConfiguration applicationConfiguration)
throws Exception {
// 省略...
// 通过 ClusterClientServiceLoader 创建 KubernetesClusterClientFactory
final ClusterClientFactory<ClusterID> clientFactory =
clientServiceLoader.getClusterClientFactory(configuration);
try (final ClusterDescriptor<ClusterID> clusterDescriptor =
clientFactory.createClusterDescriptor(configuration)) {
// 通过 KubernetesClusterClientFactory 创建 KubernetesClusterDescriptor
final ClusterSpecification clusterSpecification =
clientFactory.getClusterSpecification(configuration);
// KubernetesClusterDescriptor 创建 application 集群
clusterDescriptor.deployApplicationCluster(
clusterSpecification, applicationConfiguration);
}
}
}
public class KubernetesClusterDescriptor implements ClusterDescriptor<String> {
// 省略...
@Override
public ClusterClientProvider<String> deployApplicationCluster(
final ClusterSpecification clusterSpecification,
final ApplicationConfiguration applicationConfiguration)
throws ClusterDeploymentException {
// 省略...
// 指定集群入口 KubernetesApplicationClusterEntrypoint 部署/启动集群
final ClusterClientProvider<String> clusterClientProvider =
deployClusterInternal(
KubernetesApplicationClusterEntrypoint.class.getName(),
clusterSpecification,
false);
// 省略...
}
private ClusterClientProvider<String> deployClusterInternal(
String entryPoint, ClusterSpecification clusterSpecification, boolean detached)
throws ClusterDeploymentException {
// 省略...
// 设置集群配置,例如启动入口entry、blobserver端口、taskmanager rpc端口、rest端口等等
flinkConfig.setString(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, entryPoint);
// Rpc, blob, rest, taskManagerRpc ports need to be exposed, so update them to fixed values.
KubernetesUtils.checkAndUpdatePortConfigOption(
flinkConfig, BlobServerOptions.PORT, Constants.BLOB_SERVER_PORT);
KubernetesUtils.checkAndUpdatePortConfigOption(
flinkConfig, TaskManagerOptions.RPC_PORT, Constants.TASK_MANAGER_RPC_PORT);
KubernetesUtils.checkAndUpdatePortConfigOption(
flinkConfig, RestOptions.BIND_PORT, Constants.REST_PORT);
// 省略...
// 配置 JobManager 的 PodTemplate
try {
final KubernetesJobManagerParameters kubernetesJobManagerParameters =
new KubernetesJobManagerParameters(flinkConfig, clusterSpecification);
final FlinkPod podTemplate =
kubernetesJobManagerParameters
.getPodTemplateFilePath()
.map(
file ->
KubernetesUtils.loadPodFromTemplateFile(
client, file, Constants.MAIN_CONTAINER_NAME))
.orElse(new FlinkPod.Builder().build());
// 配置 JobManager 的 Deployment
// 配置 Deployment 的过程中,利用 CmdJobManagerDecorator 设置 JobManager main container 的启动命令,即 kubernetes-jobmanager.sh kubernetes-application
final KubernetesJobManagerSpecification kubernetesJobManagerSpec =
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
podTemplate, kubernetesJobManagerParameters);
client.createJobManagerComponent(kubernetesJobManagerSpec);
return createClusterClientProvider(clusterId);
// 省略...
}
}
}
public class Fabric8FlinkKubeClient implements FlinkKubeClient {
@Override
public void createJobManagerComponent(KubernetesJobManagerSpecification kubernetesJMSpec) {
final Deployment deployment = kubernetesJMSpec.getDeployment();
// 省略...
// 利用 Fabric8 Kubernetes Client 创建 JobManager 的 deployment
this.internalClient.resourceList(accompanyingResources).createOrReplace();
}
}
public final class KubernetesApplicationClusterEntrypoint extends ApplicationClusterEntryPoint {
// 省略...
public static void main(final String[] args) {
// 省略...
// 设置作业配置
PackagedProgram program = null;
try {
program = getPackagedProgram(configuration);
} catch (Exception e) {
LOG.error("Could not create application program.", e);
System.exit(1);
}
try {
configureExecution(configuration, program);
} catch (Exception e) {
LOG.error("Could not apply application configuration.", e);
System.exit(1);
}
final KubernetesApplicationClusterEntrypoint kubernetesApplicationClusterEntrypoint =
new KubernetesApplicationClusterEntrypoint(configuration, program);
// 利用 helper 启动集群
ClusterEntrypoint.runClusterEntrypoint(kubernetesApplicationClusterEntrypoint);
}
}
private void runCluster(Configuration configuration, PluginManager pluginManager)
throws Exception {
synchronized (lock) {
// 初始化 rpcserver、haservice、blobserver等
initializeServices(configuration, pluginManager);
// 省略...
// DispatcherResourceManagerComponent,其封装Dispatcher、ResourceManager和WebMonitorEndpoint
final DispatcherResourceManagerComponentFactory
dispatcherResourceManagerComponentFactory =
createDispatcherResourceManagerComponentFactory(configuration);
// 内部使用DispatcherRunnerFactory创建DispatcherRunner
// 接着Dispatcher选主的时候,DefaultDispatcherRunner.grantLeadership() 启动新 dispatcher leader即startNewDispatcherLeaderProcess(),DispatcherLeaderProcess.start()会利用JobDispatcherLeaderProcess.create()创建ApplicationDispatcherBootstrap,最终调用ApplicationDispatcherBootstrap.runApplicationAsync()执行用户作业的main函数
clusterComponent =
dispatcherResourceManagerComponentFactory.create(
configuration,
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
executionGraphInfoStore,
new RpcMetricQueryServiceRetriever(
metricRegistry.getMetricQueryServiceRpcService()),
this);
// 省略...
}
}
当 Dispatcher 选择主节点的时候,DefaultDispatcherRunner.grantLeadership() -> DefaultDispatcherRunner.startNewDispatcherLeaderProcess() -> DispatcherLeaderProcess.start() -> JobDispatcherLeaderProcess.create()创建ApplicationDispatcherBootstrap -> ApplicationDispatcherBootstrap.runApplicationAsync() -> ... -> ClientUtils.executeProgram() 调用作业的 main函数
说明:Dispatcher 选主是利用了 Kubernetes Client 的
LeaderElector,通过KubernetesLeaderElector封装 LeaderElector,最终利用LeaderElectionEventHandler处理选主的回调任务,其样例如下所示。
public class LeaderElectionExample {
public static void main(String[] args) throws Exception {
ApiClient client = Config.defaultClient();
Configuration.setDefaultApiClient(client);
String lockHolderIdentityName = InetAddress.getLocalHost().getHostAddress();
// 创建 ConfigMap 锁
ConfigMapLock lock = new ConfigMapLock( "default", "leader-election-ip", lockHolderIdentityName);
// Leader 选举的配置
LeaderElectionConfig leaderElectionConfig =
new LeaderElectionConfig(lock,
Duration.ofMillis(10000),
Duration.ofMillis(8000),
Duration.ofMillis(2000));
// 初始化 LeaderElector
LeaderElector leaderElector = new LeaderElector(leaderElectionConfig);
// 选举 Leader
leaderElector.run(
() -> {
System.out.println("Do something when getting leadership.");
},
() -> {
System.out.println("Do something when losing leadership.");
});
}
}
Dispatcher 通过 ApplicationDispatcherBootstrap 利用异步线程和反射机制,执行作业的 mian 函数,并且使用轮训的方式不断查询作业的状态,执行步骤如下:
步骤 1:通过 ThreadLocal 控制 Context 对象,在外部创建好 applicationJobIds 的引用列表并且层层传入,然后利用反射执行用户 main 函数;
步骤 2:在 main 函数中通过执行 execute 或 executeAysnc 生成流图并提交作业,接着把作业 ID 保存到 submitJobIds 即 applicationJobIds,因此 ApplicationDispatcherBootstrap 可以获取提交的 jobId
步骤 3:循环每个作业 ID 查询其状态是否为结束状态。如果没有结束,则一直轮训状态;如果全部结束,则退出并关闭集群。

说明:
KubernetesResourceManagerDriver.requestResource通过 Kubernetes 申请资源启动 TaskManager。
我有一个模型:classItem项目有一个属性“商店”基于存储的值,我希望Item对象对特定方法具有不同的行为。Rails中是否有针对此的通用设计模式?如果方法中没有大的if-else语句,这是如何干净利落地完成的? 最佳答案 通常通过Single-TableInheritance. 关于ruby-on-rails-Rails-子类化模型的设计模式是什么?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.co
我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i
鉴于我有以下迁移:Sequel.migrationdoupdoalter_table:usersdoadd_column:is_admin,:default=>falseend#SequelrunsaDESCRIBEtablestatement,whenthemodelisloaded.#Atthispoint,itdoesnotknowthatusershaveais_adminflag.#Soitfails.@user=User.find(:email=>"admin@fancy-startup.example")@user.is_admin=true@user.save!ende
我是Google云的新手,我正在尝试对其进行首次部署。我的第一个部署是RubyonRails项目。我基本上是在关注thisguideinthegoogleclouddocumentation.唯一的区别是我使用的是我自己的项目,而不是他们提供的“helloworld”项目。这是我的app.yaml文件runtime:customvm:trueentrypoint:bundleexecrackup-p8080-Eproductionconfig.ruresources:cpu:0.5memory_gb:1.3disk_size_gb:10当我转到我的项目目录并运行gcloudprevie
给定一个复杂的对象层次结构,幸运的是它不包含循环引用,我如何实现支持各种格式的序列化?我不是来讨论实际实现的。相反,我正在寻找可能会派上用场的设计模式提示。更准确地说:我正在使用Ruby,我想解析XML和JSON数据以构建复杂的对象层次结构。此外,应该可以将该层次结构序列化为JSON、XML和可能的HTML。我可以为此使用Builder模式吗?在任何提到的情况下,我都有某种结构化数据-无论是在内存中还是文本中-我想用它来构建其他东西。我认为将序列化逻辑与实际业务逻辑分开会很好,这样我以后就可以轻松支持多种XML格式。 最佳答案 我最
我可以在Azure网站上部署RubyonRails吗? 最佳答案 还没有。目前仅支持.NET和PHP。 关于ruby-on-rails-RubyonRails可以部署在Azure网站上吗?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/12964010/
前置步骤我们都操作完了,这篇开始介绍jenkins的集成。话不多说,看操作1、登录进入jenkins后会让你选择安装插件,选择第一个默认的就行。安装完成后设置账号密码,重新登录。2、配置JDK和Git都需要执行路径,所以需要先把执行路径找到,先进入服务器的docker容器,2.1JDK的路径root@69eef9ee86cf:/usr/bin#echo$JAVA_HOME/usr/local/openjdk-82.2Git的路径root@69eef9ee86cf:/#whichgit/usr/bin/git3、先配置JDK和Git。点击:ManageJenkins>>GlobalToolCon
深度学习部署:Windows安装pycocotools报错解决方法1.pycocotools库的简介2.pycocotools安装的坑3.解决办法更多Ai资讯:公主号AiCharm本系列是作者在跑一些深度学习实例时,遇到的各种各样的问题及解决办法,希望能够帮助到大家。ERROR:Commanderroredoutwithexitstatus1:'D:\Anaconda3\python.exe'-u-c'importsys,setuptools,tokenize;sys.argv[0]='"'"'C:\\Users\\46653\\AppData\\Local\\Temp\\pip-instal
了解Rails缓存如何工作的人可以真正帮助我。这是嵌套在Rails::Initializer.runblock中的代码:config.after_initializedoSomeClass.const_set'SOME_CONST','SOME_VAL'end现在,如果我运行script/server并发出请求,一切都很好。然而,在我的Rails应用程序的第二个请求中,一切都因单元化常量错误而变得糟糕。在生产模式下,我可以成功发出第二个请求,这意味着常量仍然存在。我已通过将以上内容更改为以下内容来解决问题:config.after_initializedorequire'some_cl
Ocra无法处理需要“tk”的应用程序require'tk'puts'nope'用奥克拉http://github.com/larsch/ocra不起作用(如链接中的一个问题所述)问题:https://github.com/larsch/ocra/issues/29(Ocra是1.9的"new"rubyscript2exe,本质上它用于将rb脚本部署为可执行文件)唯一的问题似乎是缺少tcl的DLL文件我不认为这是一个问题据我所知,问题是缺少tk的DLL文件如果它们是已知的,则可以在执行ocra时将它们包括在内有没有办法知道tk工作所需的DLL依赖项? 最佳答