草庐IT

大数据Flink进阶(十六):Flink HA搭建配置

Lansonli 2023-07-17 原文

文章目录

Flink HA搭建配置

一、Flink基于Standalone HA

1、Standalone HA配置

2、启动测试

二、Flink 基于Yarn HA

1、Yarn HA配置

2、启动测试


Flink HA搭建配置

默认情况下,每个Flink集群只有一个JobManager,这将导致单点故障(SPOF,single point of failure),如果这个JobManager挂了,则不能提交新的任务,并且运行中的程序也会失败,这是我们可以对JobManager做高可用(High Availability,简称HA),JobManager HA集群当Active JobManager节点挂掉后可以切换其他Standby JobManager成为主节点,从而避免单点故障。用户可以在Standalone、Flink on Yarn、Flink on K8s集群模式下配置Flink集群HA,Flink on K8s集群模式下的HA将单独在K8s里介绍。

一、Flink基于Standalone HA

Standalone模式下,JobManager的高可用性的基本思想是,任何时候都有一个Alive JobManager和多个Standby JobManager。Standby JobManager可以在Alive JobManager挂掉的情况下接管集群成为Alive JobManager,这样避免了单点故障,一旦某一个Standby JobManager接管集群,程序就可以继续运行。Standby JobManagers和Alive JobManager实例之间没有明确区别,每个JobManager都可以成为Alive或Standby。

1、Standalone HA配置

Standalone集群部署下实现JobManager HA 需要依赖ZooKeeper和HDFS,Zookeeper负责协调JobManger失败后的自动切换,HDFS中存储每个Flink任务的执行流程数据,因此要有一个ZooKeeper集群和Hadoop集群。这里我们选择3台节点作为Flink的JobManger,如下:

节点IP节点名称JobManagerTaskManager
192.168.179.4node1
192.168.179.5node2
192.168.179.6node3

以上node1、node2、node3都是JobManager,同时只能有1个JobManager为Active主节点,其他为StandBy备用节点,配置JobManager HA 步骤如下:

  • 所有Flink 节点配置 hadoop classpath

由于Flink JobManager HA 中需要连接HDFS存储job数据,所以Flink所有节点必须配置hadoop classpath 环境变量,在node1-3所有节点上配置/etc/profile配置环境变量:

#配置/etc/profile
export HADOOP_CLASSPATH=`hadoop classpath`

#执行生效
source /etc/profile
  • 配置masters文件

需要在所有Flink集群节点上配置$FLINK_HOME/conf/master文件,配置上所有的JobManager节点信息:

#node1,node2,node3节点上配置$FLINK_HOME/conf/master文件
node1:8081
node2:8081
node3:8081
  • 配置flink-conf.yaml

需要在所有Flink集群节点上配置$FLINK_HOME/conf/flink-conf.yaml文件,这里在node1-3节点上配置,配置内容如下:

#要启用高可用,选主协调者为zookeeper,zk存储一些ck记录及选举信息
high-availability: zookeeper

#storageDir存储恢复JobManager失败所需的所有元数据,如:job dataflow信息
high-availability.storageDir: hdfs://mycluster/flink-standalone-ha/

#分布式协调器zookeeper集群
high-availability.zookeeper.quorum: node3:2181,node4:2181,node5:2181

#根ZooKeeper节点,所有集群节点都位于根节点之下。
high-availability.zookeeper.path.root: /flink-standalone-ha

#给当前集群指定cluster-id,集群所有需要的协调数据都放在该节点下。
high-availability.cluster-id: /standalone-cluster

2、启动测试

Standalone HA 配置完成后,按照如下步骤进行测试:

  • 启动Zookeeper ,启动 HDFS
#在 node3、node4、node5节点启动zookeeper
[root@node3 ~]#  zkServer.sh start
[root@node4 ~]#  zkServer.sh start
[root@node5 ~]#  zkServer.sh start

#在node1启动HDFS集群
[root@node1 ~]# start-all.sh
  • 启动Flink Standalone HA 集群
#在node1 节点启动Flink Standalone HA集群
[root@node1 ~]# cd /software/flink-1.16.0/bin/
[root@node1 bin]# ./start-cluster.sh
Starting HA cluster with 3 masters.
Starting standalonesession daemon on host node1.
Starting standalonesession daemon on host node2.
Starting standalonesession daemon on host node3.
Starting taskexecutor daemon on host node1.
Starting taskexecutor daemon on host node2.
Starting taskexecutor daemon on host node3.

 启动Standaloe集群时同时会在node2、node3节点上启动JobManager。

  • 访问Flink WebUI

登录Flink WebUI (https://node1:8081/https://node2:8081/https://node3:8081),无论登录node1,node2,node3节点任意一台节点的WebUI 页面都相同:

在WebUI中无法看到哪个节点是Active JobManager,我们也可以通过zookeeper查看当前Active JobManager节点,命令如下:

#登录zookeeper 客户端
[root@node5 ~]# zkCli.sh

#查看对应节点路径信息
[zk: localhost:2181(CONNECTED) 1] get /flink-standalone-ha/standalone-cluster/leader/dispatcher/connection_info 
...w42akka.tcp://flink@node1:33274/user/rpc/dispatcher_1srjava.util.UUID...
  • 测试JobManager切换

我们可以在Flink Standalone集群中提交一个Flink 任务,提交之后无论在通过哪个节点的8081WebUI都可以看到此任务。提交任务命令如下:

#在node5节点启动 socket服务
[root@node5 ~]# nc -lk 9999

#在node4客户端向Standalone集群提交任务
[root@node4 ~]# cd /software/flink-1.16.0/bin
[root@node4 bin]# ./flink run -m node1:8081 -d -c com.mashibing.flinkjava.code.chapter3.SocketWordCount /root/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar

通过https://node1:8081、https://node2:8081、https://node3:8081 WebUI都可以看到提交的任务信息:

在HDFS中也可以看到提交的任务信息:

将node1节点上的JobManager进程kill掉,查看Active JobManager是否变化:

#kill node1 JobManager进程
[root@node1 bin]# jps
...
16309 StandaloneSessionClusterEntrypoint
...
[root@node1 bin]# kill -9 16309

将Active JobManager kill之后访问各个节点的WebUI可以看到短暂的不可用,稍等一会就可以看到正常可以访问除node1之外的其他节点WebUI,通过查询Zookeeper中节点信息,可以看到Active JobManager 节点切换成了其他节点:

#zookeeper查询命令
[zk: localhost:2181(CONNECTED) 1] get /flink-standalone-ha/standalone-cluster/leader/dispatcher/connection_info
...w42akka.tcp://flink@node2:35581/user/rpc/dispatcher_1srjava.util.UUID...

通过以上测试Flink Standalone HA 生效,如果想要把在node1上kill掉的JobManager启动起来,需要手动执行如下命令:

#在node1启动JobManager
[root@node1 bin]# ./jobmanager.sh start

被kill的JobManager重新启动后作为备用的JobManager也可以访问WebUI查看集群中执行的任务。

二、Flink 基于Yarn HA

正常基于Yarn提交Flink程序,无论使用哪种模式提交任务都会启动JobManager角色,JobManager角色是哪个进程可以通过Yarn WebUI查看对应的ApplicationID启动所在节点的对应进程, Yarn Session提交任务模式中该角色进程为"YarnSessionClusterEntrypoint"、Yarn Per-Job提交任务模式中该角色进程为"YarnJobClusterEntrypoint"、Yarn Application提交任务模式中该角色进程为"YarnApplicationClusterEntryPoint"。

当JobManager进程挂掉后,也就是Yarn Application任务失败后默认不会进行任务重试,所以Flink 基于Yarn JobManager HA的本质是当Yarn Application程序失败后重试启动JobManager,实际上就是通过配置Yarn重试次数来实现高可用。JobManager重试过程需要借助zookeeper 协调JobManger失败后的切换,进而进行恢复对应的任务,同时需要HDFS存储每个Flink任务的执行流程数据。

1、Yarn HA配置

Yarn HA配置步骤如下:

  • Hadoop 中所有节点的 yarn-site.xml

在所有Hadoop节点上配置$HADOOP_HOME/etc/hadoop/yarn-site.xml文件,配置应用程序失败后最大尝试次数,以下该参数默认值为2,不配置也可以:

#设置提交应用程序的最大尝试次数,建议不低于4,这里重试的是ApplicationMaster
<property>
  <name>yarn.resourcemanager.am.max-attempts</name>
  <value>4</value>
</property>
  • 配置flink-conf.yaml文件 

只需要在向Yarn提交任务的客户端节点上配置Flink的flink-conf.yaml文件。未来我们在node5节点上来基于Yarn 各种模式提交任务,所以这里我们在node5节点上配置$FLINK_HOME/conf/flink-conf.yaml文件,配置内容如下:

#要启用高可用,选主协调者为zookeeper,zk存储一些ck记录及选举信息
high-availability: zookeeper

#storageDir存储恢复JobManager失败所需的所有元数据,如:job dataflow信息
high-availability.storageDir: hdfs://mycluster/flink-yarn-ha/

#分布式协调器zookeeper集群
high-availability.zookeeper.quorum: node3:2181,node4:2181,node5:2181

#根ZooKeeper节点,所有集群节点都位于根节点之下。
high-availability.zookeeper.path.root: /flink-yarn-ha

#给当前集群指定cluster-id,集群所有需要的协调数据都放在该节点下。
high-availability.cluster-id: /yarn-cluster

#该参数同yarn-site.xml中yarn.resourcemanager.am.max-attempts参数,指向yarn提交一个application重试的次数,也可以不设置,非高可用默认为1,高可用默认为2,建议不大于yarn.resourcemanager.am.max-attempts参数,否则会被yarn.resourcemanager.am.max-attempts替换掉。
yarn.application-attempts: 4

2、启动测试

  • 启动 Zookeeper 和 HDFS
#在 node3、node4、node5节点启动zookeeper
[root@node3 ~]#  zkServer.sh start
[root@node4 ~]#  zkServer.sh start
[root@node5 ~]#  zkServer.sh start

#在node1启动HDFS集群
[root@node1 ~]# start-all.sh
  •  node5 节点向 Yarn 提交任务

这里以在node5节点上以Yarn Application模式提交任务为例,命令如下:

#在node5节点启动 socket服务
[root@node5 ~]# nc -lk 9999

#以Application模式提交任务,命令如下
[root@node5 ~]# cd /software/flink-1.16.0/bin/
[root@node5 bin]# ./flink run-application -t yarn-application -c com.mashibing.flinkjava.code.chapter3.SocketWordCount /root/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar

以上任务提交后可以在Yarn WebUI中看到对应的Application信息:

  • 测试Flink Yarn HA

在Yarn WebUI中进入到FlinkWebUi页面,查看该JobManager启动所在的节点:

进入JobManager所在节点,并kill对应的JobManager进程,模拟JobManager进程意外中断,在Yarn WebUI中可以看到对应的Yarn ApplicationID重试执行,点击该ApplicatID 可以看到该任务重试信息:

通过以上测试,Flink Yarn HA 生效。


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

有关大数据Flink进阶(十六):Flink HA搭建配置的更多相关文章

  1. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  2. ruby-on-rails - 独立 ruby​​ 脚本的配置文件 - 2

    我有一个在Linux服务器上运行的ruby​​脚本。它不使用rails或任何东西。它基本上是一个命令行ruby​​脚本,可以像这样传递参数:./ruby_script.rbarg1arg2如何将参数抽象到配置文件(例如yaml文件或其他文件)中?您能否举例说明如何做到这一点?提前谢谢你。 最佳答案 首先,您可以运行一个写入YAML配置文件的独立脚本:require"yaml"File.write("path_to_yaml_file",[arg1,arg2].to_yaml)然后,在您的应用中阅读它:require"yaml"arg

  3. Ruby Sinatra 配置用于生产和开发 - 2

    我已经在Sinatra上创建了应用程序,它代表了一个简单的API。我想在生产和开发上进行部署。我想在部署时选择,是开发还是生产,一些方法的逻辑应该改变,这取决于部署类型。是否有任何想法,如何完成以及解决此问题的一些示例。例子:我有代码get'/api/test'doreturn"Itisdev"end但是在部署到生产环境之后我想在运行/api/test之后看到ItisPROD如何实现? 最佳答案 根据SinatraDocumentation:EnvironmentscanbesetthroughtheRACK_ENVenvironm

  4. ruby - Ruby 有 `Pair` 数据类型吗? - 2

    有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳

  5. ruby - 我如何添加二进制数据来遏制 POST - 2

    我正在尝试使用Curbgem执行以下POST以解析云curl-XPOST\-H"X-Parse-Application-Id:PARSE_APP_ID"\-H"X-Parse-REST-API-Key:PARSE_API_KEY"\-H"Content-Type:image/jpeg"\--data-binary'@myPicture.jpg'\https://api.parse.com/1/files/pic.jpg用这个:curl=Curl::Easy.new("https://api.parse.com/1/files/lion.jpg")curl.multipart_form_

  6. 世界前沿3D开发引擎HOOPS全面讲解——集3D数据读取、3D图形渲染、3D数据发布于一体的全新3D应用开发工具 - 2

    无论您是想搭建桌面端、WEB端或者移动端APP应用,HOOPSPlatform组件都可以为您提供弹性的3D集成架构,同时,由工业领域3D技术专家组成的HOOPS技术团队也能为您提供技术支持服务。如果您的客户期望有一种在多个平台(桌面/WEB/APP,而且某些客户端是“瘦”客户端)快速、方便地将数据接入到3D应用系统的解决方案,并且当访问数据时,在各个平台上的性能和用户体验保持一致,HOOPSPlatform将帮助您完成。利用HOOPSPlatform,您可以开发在任何环境下的3D基础应用架构。HOOPSPlatform可以帮您打造3D创新型产品,HOOPSSDK包含的技术有:快速且准确的CAD

  7. FOHEART H1数据手套驱动Optitrack光学动捕双手运动(Unity3D) - 2

    本教程将在Unity3D中混合Optitrack与数据手套的数据流,在人体运动的基础上,添加双手手指部分的运动。双手手背的角度仍由Optitrack提供,数据手套提供双手手指的角度。 01  客户端软件分别安装MotiveBody与MotionVenus并校准人体与数据手套。MotiveBodyMotionVenus数据手套使用、校准流程参照:https://gitee.com/foheart_1/foheart-h1-data-summary.git02  数据转发打开MotiveBody软件的Streaming,开始向Unity3D广播数据;MotionVenus中设置->选项选择Unit

  8. Vscode+Cmake配置并运行opencv环境(Windows和Ubuntu大同小异) - 2

    之前在培训新生的时候,windows环境下配置opencv环境一直教的都是网上主流的vsstudio配置属性表,但是这个似乎对新生来说难度略高(虽然个人觉得完全是他们自己的问题),加之暑假之后对cmake实在是爱不释手,且这样配置确实十分简单(其实都不需要配置),故斗胆妄言vscode下配置CV之法。其实极为简单,图比较多所以很长。如果你看此文还配不好,你应该思考一下是不是自己的问题。闲话少说,直接开始。0.CMkae简介有的人到大二了都不知道cmake是什么,我不说是谁。CMake是一个开源免费并且跨平台的构建工具,可以用简单的语句来描述所有平台的编译过程。它能够根据当前所在平台输出对应的m

  9. 使用canal同步MySQL数据到ES - 2

    文章目录一、概述简介原理模块二、配置Mysql使用版本环境要求1.操作系统2.mysql要求三、配置canal-server离线下载在线下载上传解压修改配置单机配置集群配置分库分表配置1.修改全局配置2.实例配置垂直分库水平分库3.修改group-instance.xml4.启动监听四、配置canal-adapter1修改启动配置2配置映射文件3启动ES数据同步查询所有订阅同步数据同步开关启动4.验证五、配置canal-admin一、概述简介canal是Alibaba旗下的一款开源项目,Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。Git地址:https://github.co

  10. ruby-on-rails - 创建 ruby​​ 数据库时惰性符号绑定(bind)失败 - 2

    我正在尝试在Rails上安装ruby​​,到目前为止一切都已安装,但是当我尝试使用rakedb:create创建数据库时,我收到一个奇怪的错误:dyld:lazysymbolbindingfailed:Symbolnotfound:_mysql_get_client_infoReferencedfrom:/Library/Ruby/Gems/1.8/gems/mysql2-0.3.11/lib/mysql2/mysql2.bundleExpectedin:flatnamespacedyld:Symbolnotfound:_mysql_get_client_infoReferencedf

随机推荐