草庐IT

大数据Flink进阶(十):Flink集群部署

Lansonli 2023-04-07 原文

文章目录

一、Standalone集群部署

1、节点划分

2、standalone集群部署

3、任务提交测试

1、Flink不同版本与Hadoop整合

2、Flink on Yarn 配置及环境准备

3、任务提交测试


Flink的安装和部署主要分为本地(单机)模式和集群模式,其中本地模式只需直接解压就可以使用,不用修改任何参数,一般在做一些简单测试的时候使用。本地模式在这里不再赘述。集群部署模式主要包含Standalone、Hadoop Yarn 、Kubernetes等,Flink可以借助以上资源管理器来实现分布式计算,目前企业使用最多的是Flink 基于Hadoop Yarn资源管理器模式,下面我们重点讲解Flink 基于Standalone集群、Yarn资源管理器以及Kubernetes集群部署方式。

一、Standalone集群部署

1、节点划分

通过Flink运行时架构小结,我们知道Flink集群是由一个JobManager(Master)节点和多个TaskManager(Worker)节点构成,并且有对应提交任务的客户端。这里部署Standalone集群基于Linux Centos7.6版本,选择4台节点进行部署Flink,其中3台节点Standalone集群节点、一台节点是提交Flink任务的客户端,各个节点需要满足以下特点:

  • 各节点安装java8版本及以上jdk(这里选择jdk8)。
  • 各个节点之间需要两两免密。

4台节点角色划分如下:

节点IP节点名称Flink服务
192.168.179.4node1JobManager,TaskManager
192.168.179.5node2TaskManager
192.168.179.6node3TaskManager
192.168.179.7node4client

2、standalone集群部署

我们可以从Flink的官网下载Flink最新的安装包,这里选择Flink1.16.0版本,Flink安装包下载地址:https://flink.apache.org/downloads.html#apache-flink-1160。Standalone集群部署步骤如下:

上传压缩包解压

将Flink的安装包上传到node1节点/software下并解压:

[root@node1 software]# tar -zxvf ./flink-1.16.0-bin-scala_2.12.tgz

配置Master节点

配置Master节点就是配置JobManager节点,在$FLINK_HOME/conf/masters文件中配置jobManager节点如下:

#vim $FLINK_HOME/conf/masters
node1:8081

配置Worker节点

配置Worker节点就是配置TaskManager节点,在$FLINK_HOME/conf/workers文件中配置taskManager节点如下:

#vim $FLINK_HOME/conf/workers
node1
node2
node3

配置 flink-conf.yaml 文件

在node1节点上进入到FLINK_HOME/conf目录下,配置flink−conf.yaml文件(vimFLINK_HOME/conf/flink-conf.yaml配置如下内容),内容如下:

# JobManager地址
jobmanager.rpc.address: node1

# JobManager地址绑定设置
jobmanager.bind-host: 0.0.0.0

# TaskManager地址绑定设置
taskmanager.bind-host: 0.0.0.0

# TaskManager地址(不同TaskManager节点host配置对应的host)
taskmanager.host: node1

# 设置每个TaskManager 的slot个数
taskmanager.numberOfTaskSlots: 3

# WEB UI 节点(只需JobManager节点设置,TaskManager节点设置了也无所谓)
rest.address: node1

# WEB UI节点绑定设置(只需JobManster节点设置)
rest.bind-address: 0.0.0.0

注意:以上设置的0.0.0.0代表监听当前节点每一个可用的网络接口,0.0.0.0不再是一个真正意义上的ip地址,而表示一个集合,监听0.0.0.0的端口相当于是可以监听本机中的所有ip端口。以上配置的0.0.0.0 表示想要让外部访问需要设置具体ip,或者直接设置为"0.0.0.0"。

分发安装包并配置 node2  node3 节点 flink-conf.yaml 文件

#分发到node2、node3节点上
[root@node1 ~]# scp -r /software/flink-1.16.0 node2:/software/
[root@node1 ~]# scp -r /software/flink-1.16.0 node3:/software/

#修改node2、node3 节点flink-conf.yaml文件中的TaskManager
【node2节点】 taskmanager.host: node2
【node3节点】 taskmanager.host: node3

#注意,这里发送到node4,node4只是客户端
[root@node1 ~]# scp -r /software/flink-1.16.0 node4:/software/

启动Flink 集群

#在node1节点中,启动Flink集群
[root@node1 ~]# cd /software/flink-1.16.0/bin/
[root@node1 bin]# ./start-cluster.sh

访问Flink WebUI

https://node1:8081,进入页面如下:

 

3、任务提交测试

Standalone集群搭建完成后,可以将Flink任务提交到Flink Standalone集群中运行。有两种方式提交Flink任务,一种是在WebUI界面上提交Flink任务,一种方式是通过命令行方式。

这里编写读取Socket数据进行实时WordCount统计Flink任务提交到Flink集群中运行,这里以Flink Java代码为例来实现,代码如下:

/**
 * 读取Socket数据进行实时WordCount统计
 */
public class SocketWordCount {
    public static void main(String[] args) throws Exception {
        //1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.读取Socket数据
        DataStreamSource<String> ds = env.socketTextStream("node5", 9999);
        //3.准备K,V格式数据
        SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = ds.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {
            String[] words = line.split(",");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.INT));

        //4.聚合打印结果
        tupleDS.keyBy(tp -> tp.f0).sum(1).print();

        //5.execute触发执行
        env.execute();
    }
}

以上代码编写完成后,在对应的项目Maven pom 文件中加入以下plugin:

<build>
  <plugins>
    <plugin>
      <artifactId>maven-assembly-plugin</artifactId>
      <version>2.6</version>
      <configuration>
        <!-- 设置false后是去掉 xxx-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” -->
        <!--<appendAssemblyId>false</appendAssemblyId>-->
        <descriptorRefs>
          <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
        <archive>
          <manifest>
            <mainClass>xx.xx.xx</mainClass>
          </manifest>
        </archive>
      </configuration>
      <executions>
        <execution>
          <id>make-assembly</id>
          <phase>package</phase>
          <goals>
            <goal>assembly</goal>
          </goals>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>

然后使用Maven assembly 插件对项目进行打包,得到"FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar"完整jar包。

此外,代码中读取的是node5节点scoket 9999端口数据,需要在node5节点上安装nc组件:

[root@node5 ~]# yum -y install nc
  • 命令行提交Flink任务

node1 上启动 Flink Standalone 集群

[root@node1 bin]# cd /software/flink-1.16.0/bin/
[root@node1 bin]# ./start-cluster.sh

node5 节点上启动 nc socket 服务

[root@node5 ~]# nc -lk 9999

将打好的包提交到Flink 客户端 node4 节点 /root 目录下并提交任务

[root@node4 ~]# cd /software/flink-1.16.0/bin/
#向Flink集群中提交任务
[root@node4 bin]# ./flink run -m node1:8081 -c com.mashibing.flinkjava.code.lesson03.SocketWordCount /root/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar

进入Flink WebUI 界面查看任务和结果

#向node5 socket 9999 端口写入以下数据
hello,a 
hello,b
hello,c
hello,a

WebUI查看对应任务和结果

登录Flink WebUI http://node1:8081查看对应任务执行情况。

WebUI查看执行结果:

在WebUI中点击对应的任务Job,进入如下页面点击"Cancel Job"取消任务执行:

  • Web界面提交 Flink 任务

向Flink集群提交任务还可以通过WebUI方式提交。点击上传jar包,进行参数配置,并提交任务。

提交任务之后,可以通过WebUI页面查看提交任务,输入数据之后可以在对应的TaskManager节点上看到相应结果。

Flink可以基于Yarn来运行任务,Yarn作为资源提供方,可以根据Flink任务资源需求动态的启动TaskManager来提供资源。Flink基于Yarn提交任务通常叫做Flink On Yarn,Yarn资源调度框架运行需要有Hadoop集群,Hadoop版本最低是2.8.5。

1、Flink不同版本与Hadoop整合

Flink基于Yarn提交任务时,需要Flink与Hadoop进行整合。Flink1.8版本之前,Flink与Hadoop整合是通过Flink官方提供的基于对应hadoop版本编译的安装包来实现,例如:flink-1.7.2-bin-hadoop24-scala_2.11.tgz,在Flink1.8版本后不再支持基于不同Hadoop版本的编译安装包,Flink与Hadoop进行整合时,需要在官网中下载对应的Hadoop版本的"flink-shaded-hadoop-2-uber-x.x.x-x.x.jar"jar包,然后后上传到提交Flink任务的客户端对应的$FLINK_HOME/lib中完成Flink与Hadoop的整合。

在Flink1.11版本之后不再提供任何更新的flink-shaded-hadoop-x jars,Flink与Hadoop整合统一使用基于Hadoop2.8.5编译的Flink安装包,支持与Hadoop2.8.5及以上Hadoop版本(包括Hadoop3.x)整合。在Flink1.11版本后与Hadoop整合时还需要配置HADOOP_CLASSPATH环境变量来完成对Hadoop的支持。

2、Flink on Yarn 配置及环境准备

Flink 基于Yarn提交任务,向Yarn集群中提交Flink任务的客户端需要满足以下两点

  • 客户端安装了Hadoop2.8.5+版本的hadoop。
  • 客户端配置了HADOOP_CLASSPATH环境变量。

这里选择node5节点作为提交Flink的客户端,该节点已经安装了Hadoop3.3.4版本,然后在该节点中配置profile文件,加入以下环境变量:

# vim /etc/profile,加入以下配置
export HADOOP_CLASSPATH=`hadoop classpath`

#source /etc/profile 使环境变量生效
[root@node5 ~]# source /etc/profile

然后将Flink的安装包上传到node5节点/software下并解压:

[root@node5 software]# tar -zxvf ./flink-1.16.0-bin-scala_2.12.tgz

3、任务提交测试

基于Yarn运行Flink任务只能通过命令行方式进行任务提交,Flink任务基于Yarn运行时有几种任务提交部署模式(后续章节会进行介绍),下面以Application模式来提交任务。步骤如下:

  • 启动HDFS集群
#在 node3、node4、node5节点启动zookeeper
[root@node3 ~]#  zkServer.sh start
[root@node4 ~]#  zkServer.sh start
[root@node5 ~]#  zkServer.sh start

#在node1启动HDFS集群
[root@node1 ~]# start-all.sh
  • 将 Flink 任务对应的 jar 包上传到 node5 节点

这里的Flink任务还是以读取Socket数据做实时WordCount任务为例,将打好的"FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar"jar包上传到node5节点的/root/目录下。

  • node5 节点执行如下命令运行 Flink 作业
[root@node5 ~]# cd /software/flink-1.16.0/bin/

# 提交Flink任务
[root@node5 bin]#./flink run-application -t yarn-application -c com.mashibing.flinkjava.code.chapter3.SocketWordCount /root/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar
  • 查看WebUI及运行结果

Flink任务Application模式提交后,浏览器输入https://node1:8088登录Yarn WebUI,找到提交的任务,点击对应的Tracking UI"ApplicationMaster"进入到Flink WEBUI任务页面。

向node5 scoket 9999端口输入以下数据并在对应的WebUI中查看结果:

#向node5 socket 9999 端口写入以下数据
hello,a 
hello,b
hello,c
hello,a

在WebUI中找到对应的Flink TaskManager节点 Stdout输出,结果如下:



  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

有关大数据Flink进阶(十):Flink集群部署的更多相关文章

  1. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  2. ruby-on-rails - 每次我尝试部署时,我都会得到 - (gcloud.preview.app.deploy) 错误响应 : [4] DEADLINE_EXCEEDED - 2

    我是Google云的新手,我正在尝试对其进行首次部署。我的第一个部署是RubyonRails项目。我基本上是在关注thisguideinthegoogleclouddocumentation.唯一的区别是我使用的是我自己的项目,而不是他们提供的“helloworld”项目。这是我的app.yaml文件runtime:customvm:trueentrypoint:bundleexecrackup-p8080-Eproductionconfig.ruresources:cpu:0.5memory_gb:1.3disk_size_gb:10当我转到我的项目目录并运行gcloudprevie

  3. ruby - Ruby 有 `Pair` 数据类型吗? - 2

    有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳

  4. ruby - 我如何添加二进制数据来遏制 POST - 2

    我正在尝试使用Curbgem执行以下POST以解析云curl-XPOST\-H"X-Parse-Application-Id:PARSE_APP_ID"\-H"X-Parse-REST-API-Key:PARSE_API_KEY"\-H"Content-Type:image/jpeg"\--data-binary'@myPicture.jpg'\https://api.parse.com/1/files/pic.jpg用这个:curl=Curl::Easy.new("https://api.parse.com/1/files/lion.jpg")curl.multipart_form_

  5. 世界前沿3D开发引擎HOOPS全面讲解——集3D数据读取、3D图形渲染、3D数据发布于一体的全新3D应用开发工具 - 2

    无论您是想搭建桌面端、WEB端或者移动端APP应用,HOOPSPlatform组件都可以为您提供弹性的3D集成架构,同时,由工业领域3D技术专家组成的HOOPS技术团队也能为您提供技术支持服务。如果您的客户期望有一种在多个平台(桌面/WEB/APP,而且某些客户端是“瘦”客户端)快速、方便地将数据接入到3D应用系统的解决方案,并且当访问数据时,在各个平台上的性能和用户体验保持一致,HOOPSPlatform将帮助您完成。利用HOOPSPlatform,您可以开发在任何环境下的3D基础应用架构。HOOPSPlatform可以帮您打造3D创新型产品,HOOPSSDK包含的技术有:快速且准确的CAD

  6. ruby-on-rails - Ruby on Rails 可以部署在 Azure 网站上吗? - 2

    我可以在Azure网站上部署RubyonRails吗? 最佳答案 还没有。目前仅支持.NET和PHP。 关于ruby-on-rails-RubyonRails可以部署在Azure网站上吗?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/12964010/

  7. FOHEART H1数据手套驱动Optitrack光学动捕双手运动(Unity3D) - 2

    本教程将在Unity3D中混合Optitrack与数据手套的数据流,在人体运动的基础上,添加双手手指部分的运动。双手手背的角度仍由Optitrack提供,数据手套提供双手手指的角度。 01  客户端软件分别安装MotiveBody与MotionVenus并校准人体与数据手套。MotiveBodyMotionVenus数据手套使用、校准流程参照:https://gitee.com/foheart_1/foheart-h1-data-summary.git02  数据转发打开MotiveBody软件的Streaming,开始向Unity3D广播数据;MotionVenus中设置->选项选择Unit

  8. 使用canal同步MySQL数据到ES - 2

    文章目录一、概述简介原理模块二、配置Mysql使用版本环境要求1.操作系统2.mysql要求三、配置canal-server离线下载在线下载上传解压修改配置单机配置集群配置分库分表配置1.修改全局配置2.实例配置垂直分库水平分库3.修改group-instance.xml4.启动监听四、配置canal-adapter1修改启动配置2配置映射文件3启动ES数据同步查询所有订阅同步数据同步开关启动4.验证五、配置canal-admin一、概述简介canal是Alibaba旗下的一款开源项目,Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。Git地址:https://github.co

  9. ruby-on-rails - 创建 ruby​​ 数据库时惰性符号绑定(bind)失败 - 2

    我正在尝试在Rails上安装ruby​​,到目前为止一切都已安装,但是当我尝试使用rakedb:create创建数据库时,我收到一个奇怪的错误:dyld:lazysymbolbindingfailed:Symbolnotfound:_mysql_get_client_infoReferencedfrom:/Library/Ruby/Gems/1.8/gems/mysql2-0.3.11/lib/mysql2/mysql2.bundleExpectedin:flatnamespacedyld:Symbolnotfound:_mysql_get_client_infoReferencedf

  10. jenkins部署1--jenkins+gitee持续集成 - 2

    前置步骤我们都操作完了,这篇开始介绍jenkins的集成。话不多说,看操作1、登录进入jenkins后会让你选择安装插件,选择第一个默认的就行。安装完成后设置账号密码,重新登录。2、配置JDK和Git都需要执行路径,所以需要先把执行路径找到,先进入服务器的docker容器,2.1JDK的路径root@69eef9ee86cf:/usr/bin#echo$JAVA_HOME/usr/local/openjdk-82.2Git的路径root@69eef9ee86cf:/#whichgit/usr/bin/git3、先配置JDK和Git。点击:ManageJenkins>>GlobalToolCon

随机推荐