草庐IT

【Flink on k8s】Native Kubernetes Application 部署模式详解

熊本极客 2023-09-28 原文

本文对 Flink 的 Application、Per-Job 和 Session 部署模式进行了对比分析。详细介绍了 Native Kubernetes 场景下的 Application 部署模式,并且对整个启动流程进行了源码分析

1.Native Kubernetes Application 简介

1.1 Flink 部署模式简介

Flink 的部署模式有 Application、Per-Job 和 Session 模式

Application、Per-Job 和 Session 部署模式的主要区别:
● 集群与作业的生命周期是否一致
● 资源的隔离程度
● 作业的 mian() 运行在 client 还是集群上

Application 模式的特点:① 作业与 Flink 集群打包在一起,在 JobManager 的启动时候会执行作业的 main 函数直接启动作业,而不需要通过 Flink Client 提交作业。② 作业的生命周期与 Flink 集群的一致,即作业关闭后 Flink 集群也会关闭

说明:Application 模式对比 Per-Job 模式最大的区别是前者使用 executeAsync() 提交作业(不阻塞),而后者使用 execute() 提交作业(阻塞),因此 Application 模式可以运行多个作业

Per-Job 模式的特点:作业与 Flink 集群不是打包在一起,在 JobManager 启动后需要通过 Flink Client 提交作业,即增加了网络传输的压力和客户端的 CPU 资源。

Session 模式的特点:常驻的 JobManager,多个作业共享同一个集群。如果其中一个作业异常导致 TaskManager 关闭,则该 TM 上的全部作业都会重新调度。

部署模式汇总.PNG

1.2 Flink Native Kubernetes Application 架构图

资源调度方面:Flink 支持 Kubernetes、Yarn 和 Mesos 资源调度器

Native 是指可以通过底层的资源调度管理器,实现弹性扩缩容。Native Kubernetes Application 是指 Flink 采用 Application 的部署模式,并使用 Kubernetes 进行资源管理。

用户只需要通过 Flink Client/CLI 启动作业。首先通过 K8s 启动 JobManager(deployment)的同时启动作业,然后通过 JobManager 内部的 K8sResourceManager 模块向 K8s 直接申请 TaskManager 的资源并启动,最后当 TM 注册到 JM 后作业就提交到 TM。用户在整个过程无需指定 TaskManager 资源的数量,而是由 JobManager 向 K8s 按需申请的。

flink native kubernetes application 架构图.png

Flink Application on Native Kubernetes 的实践案例
《Flink on K8s 在阿里巴巴的实践》
《Native Flink on K8s 在小红书的实践》
《Flink on K8s 在京东的持续优化实践》

2.启动流程详解

2.1 启动流程总览

image.png

2.2 启动脚本及其配置

$ ./bin/flink run-application \
    --target kubernetes-application \
    -Dkubernetes.cluster-id=top-speed-windowing-application \
    -Dkubernetes.container.image=172.1.45.167:5000/flink:1.13.6-scala_2.11 \
    local:///opt/flink/examples/streaming/TopSpeedWindowing.jar

Native Kubernetes Application 模式下,启动脚本 ./bin/flink 的必要参数有 --target kubernetes-application-Dkubernetes.cluster-id=***-Dkubernetes.container.image=*** 和 作业 jar 路径 local:///***

2.3 启动 JobManager 和作业

2.3.1 CliFrontend 入口

    public int parseAndRun(String[] args) {
        // 省略...
        try {
            // do action
            switch (action) {
                case ACTION_RUN:
                    run(params);
                    return 0;
                 // 匹配参数 run-application
                case ACTION_RUN_APPLICATION:
                    runApplication(params);
                    return 0;
                case ACTION_LIST:
                    list(params);
                    return 0;
                // 省略...
    }


    protected void runApplication(String[] args) throws Exception {
        //  省略...

        //  创建 ApplicationDeployer 用于创建 Kubernetes ClusterDescriptor
        final ApplicationDeployer deployer =
                new ApplicationClusterDeployer(clusterClientServiceLoader);

        if (ProgramOptionsUtils.isPythonEntryPoint(commandLine)) {
            programOptions = ProgramOptionsUtils.createPythonProgramOptions(commandLine);
            effectiveConfiguration =
                    getEffectiveConfiguration(
                            activeCommandLine,
                            commandLine,
                            programOptions,
                            Collections.emptyList());
        } else {
            //  作业参数,例如 jar 路径、main 函数入口、args 入参等等
            programOptions = new ProgramOptions(commandLine);
            programOptions.validate();
            final URI uri = PackagedProgramUtils.resolveURI(programOptions.getJarFilePath());
            effectiveConfiguration =
                    getEffectiveConfiguration(
                            activeCommandLine,
                            commandLine,
                            programOptions,
                            Collections.singletonList(uri.toString()));
        }

        final ApplicationConfiguration applicationConfiguration =
                new ApplicationConfiguration(
                        programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
        //  提交用户的作业并在集群中运行其 main 函数
        deployer.run(effectiveConfiguration, applicationConfiguration);
    }

2.3.2 Flink Client 通过 K8s Client 创建集群

public class ApplicationClusterDeployer implements ApplicationDeployer {
    // 省略...
    public <ClusterID> void run(
            final Configuration configuration,
            final ApplicationConfiguration applicationConfiguration)
            throws Exception {
        // 省略...
        // 通过 ClusterClientServiceLoader 创建 KubernetesClusterClientFactory
        final ClusterClientFactory<ClusterID> clientFactory =
                clientServiceLoader.getClusterClientFactory(configuration);
        try (final ClusterDescriptor<ClusterID> clusterDescriptor =
                clientFactory.createClusterDescriptor(configuration)) {
            // 通过 KubernetesClusterClientFactory 创建 KubernetesClusterDescriptor
            final ClusterSpecification clusterSpecification =
                    clientFactory.getClusterSpecification(configuration);
            // KubernetesClusterDescriptor 创建 application 集群
            clusterDescriptor.deployApplicationCluster(
                    clusterSpecification, applicationConfiguration);
        }
    }
}

public class KubernetesClusterDescriptor implements ClusterDescriptor<String> {
    // 省略...
    @Override
    public ClusterClientProvider<String> deployApplicationCluster(
            final ClusterSpecification clusterSpecification,
            final ApplicationConfiguration applicationConfiguration)
            throws ClusterDeploymentException {
        // 省略...
        // 指定集群入口 KubernetesApplicationClusterEntrypoint 部署/启动集群
        final ClusterClientProvider<String> clusterClientProvider =
                deployClusterInternal(
                        KubernetesApplicationClusterEntrypoint.class.getName(),
                        clusterSpecification,
                        false);
        // 省略...
    }


    private ClusterClientProvider<String> deployClusterInternal(
            String entryPoint, ClusterSpecification clusterSpecification, boolean detached)
            throws ClusterDeploymentException {
        // 省略...
        // 设置集群配置,例如启动入口entry、blobserver端口、taskmanager rpc端口、rest端口等等
        flinkConfig.setString(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, entryPoint);

        // Rpc, blob, rest, taskManagerRpc ports need to be exposed, so update them to fixed values.
        KubernetesUtils.checkAndUpdatePortConfigOption(
                flinkConfig, BlobServerOptions.PORT, Constants.BLOB_SERVER_PORT);
        KubernetesUtils.checkAndUpdatePortConfigOption(
                flinkConfig, TaskManagerOptions.RPC_PORT, Constants.TASK_MANAGER_RPC_PORT);
        KubernetesUtils.checkAndUpdatePortConfigOption(
                flinkConfig, RestOptions.BIND_PORT, Constants.REST_PORT);
        // 省略...

        // 配置 JobManager 的 PodTemplate
        try {
            final KubernetesJobManagerParameters kubernetesJobManagerParameters =
                    new KubernetesJobManagerParameters(flinkConfig, clusterSpecification);

            final FlinkPod podTemplate =
                    kubernetesJobManagerParameters
                            .getPodTemplateFilePath()
                            .map(
                                    file ->
                                            KubernetesUtils.loadPodFromTemplateFile(
                                                    client, file, Constants.MAIN_CONTAINER_NAME))
                            .orElse(new FlinkPod.Builder().build());
            // 配置 JobManager 的 Deployment
            // 配置 Deployment 的过程中,利用 CmdJobManagerDecorator 设置 JobManager main container 的启动命令,即 kubernetes-jobmanager.sh kubernetes-application
            final KubernetesJobManagerSpecification kubernetesJobManagerSpec =
                    KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
                            podTemplate, kubernetesJobManagerParameters);

            client.createJobManagerComponent(kubernetesJobManagerSpec);

            return createClusterClientProvider(clusterId);
            // 省略...
        } 
    }
}

public class Fabric8FlinkKubeClient implements FlinkKubeClient {
    @Override
    public void createJobManagerComponent(KubernetesJobManagerSpecification kubernetesJMSpec) {
        final Deployment deployment = kubernetesJMSpec.getDeployment();
        // 省略...
        // 利用  Fabric8 Kubernetes Client 创建 JobManager 的 deployment
        this.internalClient.resourceList(accompanyingResources).createOrReplace();
    }
}

2.3.3 容器内启动集群

public final class KubernetesApplicationClusterEntrypoint extends ApplicationClusterEntryPoint {
    // 省略...
    public static void main(final String[] args) {
        // 省略...
        // 设置作业配置
        PackagedProgram program = null;
        try {
            program = getPackagedProgram(configuration);
        } catch (Exception e) {
            LOG.error("Could not create application program.", e);
            System.exit(1);
        }

        try {
            configureExecution(configuration, program);
        } catch (Exception e) {
            LOG.error("Could not apply application configuration.", e);
            System.exit(1);
        }

        final KubernetesApplicationClusterEntrypoint kubernetesApplicationClusterEntrypoint =
                new KubernetesApplicationClusterEntrypoint(configuration, program);
        // 利用 helper 启动集群
        ClusterEntrypoint.runClusterEntrypoint(kubernetesApplicationClusterEntrypoint);
    }
}

    private void runCluster(Configuration configuration, PluginManager pluginManager)
            throws Exception {
        synchronized (lock) {
            //  初始化 rpcserver、haservice、blobserver等
            initializeServices(configuration, pluginManager);
            //  省略...
            //  DispatcherResourceManagerComponent,其封装Dispatcher、ResourceManager和WebMonitorEndpoint
            final DispatcherResourceManagerComponentFactory
                    dispatcherResourceManagerComponentFactory =
                            createDispatcherResourceManagerComponentFactory(configuration);
            //  内部使用DispatcherRunnerFactory创建DispatcherRunner
            // 接着Dispatcher选主的时候,DefaultDispatcherRunner.grantLeadership() 启动新 dispatcher leader即startNewDispatcherLeaderProcess(),DispatcherLeaderProcess.start()会利用JobDispatcherLeaderProcess.create()创建ApplicationDispatcherBootstrap,最终调用ApplicationDispatcherBootstrap.runApplicationAsync()执行用户作业的main函数
            clusterComponent =
                    dispatcherResourceManagerComponentFactory.create(
                            configuration,
                            ioExecutor,
                            commonRpcService,
                            haServices,
                            blobServer,
                            heartbeatServices,
                            metricRegistry,
                            executionGraphInfoStore,
                            new RpcMetricQueryServiceRetriever(
                                    metricRegistry.getMetricQueryServiceRpcService()),
                            this);
            //  省略...
        }
    }

当 Dispatcher 选择主节点的时候,DefaultDispatcherRunner.grantLeadership() -> DefaultDispatcherRunner.startNewDispatcherLeaderProcess() -> DispatcherLeaderProcess.start() -> JobDispatcherLeaderProcess.create()创建ApplicationDispatcherBootstrap -> ApplicationDispatcherBootstrap.runApplicationAsync() -> ... -> ClientUtils.executeProgram() 调用作业的 main函数

说明:Dispatcher 选主是利用了 Kubernetes Client 的 LeaderElector,通过 KubernetesLeaderElector 封装 LeaderElector,最终利用 LeaderElectionEventHandler 处理选主的回调任务,其样例如下所示。

public class LeaderElectionExample {
    public static void main(String[] args) throws Exception {
        ApiClient client = Config.defaultClient();
        Configuration.setDefaultApiClient(client);
        String lockHolderIdentityName = InetAddress.getLocalHost().getHostAddress();
        // 创建 ConfigMap 锁
        ConfigMapLock lock = new ConfigMapLock( "default", "leader-election-ip", lockHolderIdentityName);
        // Leader 选举的配置
        LeaderElectionConfig leaderElectionConfig =
                new LeaderElectionConfig(lock,
                        Duration.ofMillis(10000),
                        Duration.ofMillis(8000),
                        Duration.ofMillis(2000));

        // 初始化 LeaderElector
        LeaderElector leaderElector = new LeaderElector(leaderElectionConfig);
        // 选举 Leader
        leaderElector.run(
                () -> {
                    System.out.println("Do something when getting leadership.");
                },
                () -> {
                    System.out.println("Do something when losing leadership.");
                });
    }
}

2.3.4 ApplicationDispatcherBootstrap 启动作业

Dispatcher 通过 ApplicationDispatcherBootstrap 利用异步线程和反射机制,执行作业的 mian 函数,并且使用轮训的方式不断查询作业的状态,执行步骤如下:

步骤 1:通过 ThreadLocal 控制 Context 对象,在外部创建好 applicationJobIds 的引用列表并且层层传入,然后利用反射执行用户 main 函数;

步骤 2:在 main 函数中通过执行 execute 或 executeAysnc 生成流图并提交作业,接着把作业 ID 保存到 submitJobIdsapplicationJobIds,因此 ApplicationDispatcherBootstrap 可以获取提交的 jobId

步骤 3:循环每个作业 ID 查询其状态是否为结束状态。如果没有结束,则一直轮训状态;如果全部结束,则退出并关闭集群。

image.png

2.3.5 申请资源启动 TaskManager

说明KubernetesResourceManagerDriver.requestResource 通过 Kubernetes 申请资源启动 TaskManager。

有关【Flink on k8s】Native Kubernetes Application 部署模式详解的更多相关文章

  1. ruby-on-rails - Rails - 子类化模型的设计模式是什么? - 2

    我有一个模型:classItem项目有一个属性“商店”基于存储的值,我希望Item对象对特定方法具有不同的行为。Rails中是否有针对此的通用设计模式?如果方法中没有大的if-else语句,这是如何干净利落地完成的? 最佳答案 通常通过Single-TableInheritance. 关于ruby-on-rails-Rails-子类化模型的设计模式是什么?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.co

  2. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  3. ruby - 如何在续集中重新加载表模式? - 2

    鉴于我有以下迁移:Sequel.migrationdoupdoalter_table:usersdoadd_column:is_admin,:default=>falseend#SequelrunsaDESCRIBEtablestatement,whenthemodelisloaded.#Atthispoint,itdoesnotknowthatusershaveais_adminflag.#Soitfails.@user=User.find(:email=>"admin@fancy-startup.example")@user.is_admin=true@user.save!ende

  4. ruby-on-rails - 每次我尝试部署时,我都会得到 - (gcloud.preview.app.deploy) 错误响应 : [4] DEADLINE_EXCEEDED - 2

    我是Google云的新手,我正在尝试对其进行首次部署。我的第一个部署是RubyonRails项目。我基本上是在关注thisguideinthegoogleclouddocumentation.唯一的区别是我使用的是我自己的项目,而不是他们提供的“helloworld”项目。这是我的app.yaml文件runtime:customvm:trueentrypoint:bundleexecrackup-p8080-Eproductionconfig.ruresources:cpu:0.5memory_gb:1.3disk_size_gb:10当我转到我的项目目录并运行gcloudprevie

  5. ruby - 是否有用于序列化和反序列化各种格式的对象层次结构的模式? - 2

    给定一个复杂的对象层次结构,幸运的是它不包含循环引用,我如何实现支持各种格式的序列化?我不是来讨论实际实现的。相反,我正在寻找可能会派上用场的设计模式提示。更准确地说:我正在使用Ruby,我想解析XML和JSON数据以构建复杂的对象层次结构。此外,应该可以将该层次结构序列化为JSON、XML和可能的HTML。我可以为此使用Builder模式吗?在任何提到的情况下,我都有某种结构化数据-无论是在内存中还是文本中-我想用它来构建其他东西。我认为将序列化逻辑与实际业务逻辑分开会很好,这样我以后就可以轻松支持多种XML格式。 最佳答案 我最

  6. ruby-on-rails - Ruby on Rails 可以部署在 Azure 网站上吗? - 2

    我可以在Azure网站上部署RubyonRails吗? 最佳答案 还没有。目前仅支持.NET和PHP。 关于ruby-on-rails-RubyonRails可以部署在Azure网站上吗?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/12964010/

  7. jenkins部署1--jenkins+gitee持续集成 - 2

    前置步骤我们都操作完了,这篇开始介绍jenkins的集成。话不多说,看操作1、登录进入jenkins后会让你选择安装插件,选择第一个默认的就行。安装完成后设置账号密码,重新登录。2、配置JDK和Git都需要执行路径,所以需要先把执行路径找到,先进入服务器的docker容器,2.1JDK的路径root@69eef9ee86cf:/usr/bin#echo$JAVA_HOME/usr/local/openjdk-82.2Git的路径root@69eef9ee86cf:/#whichgit/usr/bin/git3、先配置JDK和Git。点击:ManageJenkins>>GlobalToolCon

  8. 深度学习部署:Windows安装pycocotools报错解决方法 - 2

    深度学习部署:Windows安装pycocotools报错解决方法1.pycocotools库的简介2.pycocotools安装的坑3.解决办法更多Ai资讯:公主号AiCharm本系列是作者在跑一些深度学习实例时,遇到的各种各样的问题及解决办法,希望能够帮助到大家。ERROR:Commanderroredoutwithexitstatus1:'D:\Anaconda3\python.exe'-u-c'importsys,setuptools,tokenize;sys.argv[0]='"'"'C:\\Users\\46653\\AppData\\Local\\Temp\\pip-instal

  9. ruby-on-rails - environment.rb 中设置的常量在开发模式中消失 - 2

    了解Rails缓存如何工作的人可以真正帮助我。这是嵌套在Rails::Initializer.runblock中的代码:config.after_initializedoSomeClass.const_set'SOME_CONST','SOME_VAL'end现在,如果我运行script/server并发出请求,一切都很好。然而,在我的Rails应用程序的第二个请求中,一切都因单元化常量错误而变得糟糕。在生产模式下,我可以成功发出第二个请求,这意味着常量仍然存在。我已通过将以上内容更改为以下内容来解决问题:config.after_initializedorequire'some_cl

  10. Ruby,使用包含 TK GUI 的 ocra 部署一个 exe - 2

    Ocra无法处理需要“tk”的应用程序require'tk'puts'nope'用奥克拉http://github.com/larsch/ocra不起作用(如链接中的一个问题所述)问题:https://github.com/larsch/ocra/issues/29(Ocra是1.9的"new"rubyscript2exe,本质上它用于将rb脚本部署为可执行文件)唯一的问题似乎是缺少tcl的DLL文件我不认为这是一个问题据我所知,问题是缺少tk的DLL文件如果它们是已知的,则可以在执行ocra时将它们包括在内有没有办法知道tk工作所需的DLL依赖项? 最佳答

随机推荐