主要回答以下问题:

Flink的核心组件包含客户端,jobmanager(JM)和taskmanager™三部分。此外Flink往往还需要结合很多外部组件一起使用,比如高可用服务、持久化存储、资源管理、指标存储与分析的组件。
Flink客户端主要负责将job提交给JM。JM是中央调度器,包含Jobmaster, Dispatcher, ResourceManager三部分。JobMaster is responsible for managing the execution of a single JobGraph. Multiple jobs can run simultaneously in a Flink cluster, each having its own JobMaster. The Dispatcher provides a REST interface to submit Flink applications for execution and starts a new JobMaster for each submitted job. It also runs the Flink WebUI to provide information about job executions. The ResourceManager is responsible for resource de-/allocation and provisioning in a Flink cluster — it manages task slots, which are the unit of resource scheduling in a Flink cluster. TM负责执行具体的任务。
如果只是提交作业和执行作业,不考虑整个集群的稳定性,拓展性,便于维护的性能等,只部署以上三个组件就够了。
但是,如果TM done掉了,JM还可以控制任务重启在其它TM上;如果JM done掉了,所有的任务都将失败,因此我们需要部署高可用服务使得一个JM done掉后,备用的JM 自动地顶上去作业。Flink目前(1.16)仅支持两种高可用服务:Zookeeper HA service 和 K8s HA service.
Flink有故障恢复的机制在任务失败后重启任务,并读取任务失败前的状态在这个状态下继续工作,可以保证哪怕任务失败重启,数据也不丢失,不重发。而这个“任务失败前的状态”是通过checkpoint保存的,考虑到多个JM需要共享checkpoint,checkpoint往往保存在可共享的持久化外部存储系统中,比如HDFS,S3等。因此我们还需要部署文件存储系统。
再说集群的资源管理和调度,Flink支持k8s和YARN两种工具来自动化管理集群资源,也可以不依赖于任何Resource Provider,采用独立部署(standalone)方式部署集群。
再说集群的监控,Flink本身收集了很多指标,可以通过metrics reporter与外部的指标存储、分析、展示工具一起搭建一个Flink监控系统。比如联合Prometheus, grafana搭建监控系统。
在讲解Flink不同的部署方式以及不同部署方式下各组件如何协调工作前,我认为很有必要讲解一下Flink的作业执行机制,便于理解之后会反复提到的JobGraph,task, slots等概念。
程序运行时会被映射为dataflows,每个数据流都是以一个或多个sources开始,一个或多个sinks结束,类似于任意的有向无环图。大多数情况下,程序中的转换运算和dataflow中的算子(operator)是一一对应的关系。
比如下图中的程序就可以转化为由source,map算子,分组聚合算子,sink组成的数据流。


任务并行:不同的任务(算子)并行处理不同的数据,数据流图中横向的同时执行。
数据并行:一个算子可以包含一个或多个子任务,这些子任务在不同的线程、不同的物理机或容器中完全独立地执行。
并行度:一个特定算子的子任务个数,指的数据并行。有些像多线程的线程数,但和多线程不一样的是,多线程的子线程共享内存资源,但是一个算子的子任务运行在不同的slot上,内存资源是隔离的。注意并行度针对的是算子,不同的算子可以设置为不同的并行度。
并行度的设置:
//全局,不推荐
env.setParallelism(1);
//每一个算子
source.map(...).setParallelism(1)
并行度的执行规则:底层实现>代码局部>代码全局设置>提交任务时的命令行设置>配置文件的默认设置
上面介绍了数据流图,算子,并行度的概念,再来说什么是算子链。
Flink中算子与算子之间的数据传输形式大体可以分为以下两类:
one-to-one(forwarding) 直通:
从一个算子到另一个算子的分区不变,比如source和map之间,这代表着map算子的子任务看到的元素的个数和顺序和source算子的子任务产生的相同。map,filter,flatMap都属于这种(前提是并行度不变)
redistributing(重分配):
stream的分区会发生改变,如keyby.
如果前后两个算子并行度相同,且传输方式为one-to-one就可以合并为一个算子链。通常我们说的task就是指的一个算子链,subtask往往指的同一算子链的子任务。
算子合并为算子链是作业执行中很重要的一个优化手段,是否合并是可以通过代码控制的,在作业的性能调优中也是一个可以考虑的调优点。
Flink中之所以合并算子主要考虑的是减少算子之间不必要的数据传输,因为在flink中,不同任务之间的数据传输带来的性能开销其实并不小,一是数据传输必然涉及到序列化和反序列,要是一条数据很大,又选择了不合适的数据类型比如json,那带来的性能损耗是非常明显的;二是如果任务处于不同的taskmanager,那数据传输还涉及到网络传输。另外合并算子也减少了整个job的线程数,能够减少线程转化的开销。
需要注意的是,合并算子并不一定能带来性能提升的,因为算子合并其实相当于减少了并发,可能会影响CPU利用率,可以参考多线程的线程数考虑这一点。
相关概念介绍完后,简单介绍一下(很多细节还未搞明白,但尚不影响使用)一个Flink作业是如何一步步转化为Taskmanager上可以执行的task的。下面的描述主要针对Session部署方式,对于Application部署模式之后再介绍。
首先,客户端会将代码转化为dataflow,dataflow进一步优化如合并算子链后生成JobGraph。

然后,JM对JobGraph根据并行度进行拆分生成执行图,

最后JM会分发执行图到taskmanager上,实际执行的叫物理执行图。
slots是Flink中资源分配的最小单位。Flink对内存资源是进行了隔离的,隔离出来的每一份资源叫一个slot。每个TM通过参数taskmanager.numberOfTaskSlots配置slots的数量。建议根据核的数量分配任务槽,这样一个任务槽就一个cpu核,cpu就不需要分时复用了。默认slots平分整个TM的内存资源,Flink也支持细粒度地划分slots的资源。

需要配置cluster.fine-grained-resource-management.enabled为true
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SlotSharingGroup ssgA = SlotSharingGroup.newBuilder("a")
.setCpuCores(1.0)
.setTaskHeapMemoryMB(100)
.build();
SlotSharingGroup ssgB = SlotSharingGroup.newBuilder("b")
.setCpuCores(0.5)
.setTaskHeapMemoryMB(100)
.build();
someStream.filter(...).slotSharingGroup("a") // Set the slot sharing group with name “a”
.map(...).slotSharingGroup(ssgB); // Directly set the slot sharing group with name and resource.
env.registerSlotSharingGroup(ssgA); // Then register the resource of group “a”

上面讲的是资源的分配,再讲资源的调度:不同的task如何分配到slots上面。 主要遵守下面两个原则: 同一个任务的不同子任务只能分配到不同的slots上;多个任务可以共享slot。以上图为例,一共3个算子链,并行度分别为6,6,1,每个算子链在slots上依次分配,同一个Job的不同算子链共享slot的。
基于这样的资源调度规则,就不难理解“一个job需要的任务槽的数量至少为算子链的最大并行度“。像上面的示例,需要的任务槽数量就是6。
为什么slots可以共享?不同的task资源完全隔离不好吗?这里主要是从提高资源的利用率考虑的,希望各个内存区域的使用相对均衡,而不是忙的忙死闲的闲死。
Flink提供了3种部署模式:
它们的区别主要在于:集群的生命周期以及资源的分配方式;应用的Main方法在哪里执行——客户端还是JobManager。 其中Per-Job模式在1.15版本后已经废弃,就不再介绍了。
先启动集群,保持一个会话,在这个会话中通过客户端提交作业。集群启动时所有资源就都已经确定,所以所有提交的作业会竞争集群中的资源。一个任务导致集群崩溃会牵连其他所有任务。
会话模式适合单个规模小、执行时间短的大量作业。(因为执行时间短,所以单个作业占用的资源很快能释放掉给下一个作业使用,不需要反复启动集群,反复部署资源)
应用模式是提交任务的同时启动集群,一个应用一个集群,应用在集群在,应用亡集群自动关闭。此外,应用模式的另一个显著特点是应用的main方法执行在JM,而不是客户端。这样做是为了减轻客户端的负载,避免当多个用户同时提交任务时客户端宕机。
那么,main方法的执行为什么会带来较大的负载呢?执行main方法首先需要下载相关的依赖,还需要抽取拓扑结构(比如JobGraph)便于后续的处理。客户端执行完后还需要把这些都传输给JM。这就使得客户端一是需要格外的网络带宽下载依赖,传输数据给JM; 二是消耗更多的CPU。因此application模式把这部分的工作放在了JM上。
官方推荐在产线上使用应用模式,在测试开发中使用会话模式。
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
这里是Ruby新手。完成一些练习后碰壁了。练习:计算一系列成绩的字母等级创建一个方法get_grade来接受测试分数数组。数组中的每个分数应介于0和100之间,其中100是最大分数。计算平均分并将字母等级作为字符串返回,即“A”、“B”、“C”、“D”、“E”或“F”。我一直返回错误:avg.rb:1:syntaxerror,unexpectedtLBRACK,expecting')'defget_grade([100,90,80])^avg.rb:1:syntaxerror,unexpected')',expecting$end这是我目前所拥有的。我想坚持使用下面的方法或.join,
我是Google云的新手,我正在尝试对其进行首次部署。我的第一个部署是RubyonRails项目。我基本上是在关注thisguideinthegoogleclouddocumentation.唯一的区别是我使用的是我自己的项目,而不是他们提供的“helloworld”项目。这是我的app.yaml文件runtime:customvm:trueentrypoint:bundleexecrackup-p8080-Eproductionconfig.ruresources:cpu:0.5memory_gb:1.3disk_size_gb:10当我转到我的项目目录并运行gcloudprevie
我可以在Azure网站上部署RubyonRails吗? 最佳答案 还没有。目前仅支持.NET和PHP。 关于ruby-on-rails-RubyonRails可以部署在Azure网站上吗?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/12964010/
在应用开发中,有时候我们需要获取系统的设备信息,用于数据上报和行为分析。那在鸿蒙系统中,我们应该怎么去获取设备的系统信息呢,比如说获取手机的系统版本号、手机的制造商、手机型号等数据。1、获取方式这里分为两种情况,一种是设备信息的获取,一种是系统信息的获取。1.1、获取设备信息获取设备信息,鸿蒙的SDK包为我们提供了DeviceInfo类,通过该类的一些静态方法,可以获取设备信息,DeviceInfo类的包路径为:ohos.system.DeviceInfo.具体的方法如下:ModifierandTypeMethodDescriptionstatic StringgetAbiList()Obt
前置步骤我们都操作完了,这篇开始介绍jenkins的集成。话不多说,看操作1、登录进入jenkins后会让你选择安装插件,选择第一个默认的就行。安装完成后设置账号密码,重新登录。2、配置JDK和Git都需要执行路径,所以需要先把执行路径找到,先进入服务器的docker容器,2.1JDK的路径root@69eef9ee86cf:/usr/bin#echo$JAVA_HOME/usr/local/openjdk-82.2Git的路径root@69eef9ee86cf:/#whichgit/usr/bin/git3、先配置JDK和Git。点击:ManageJenkins>>GlobalToolCon
深度学习部署:Windows安装pycocotools报错解决方法1.pycocotools库的简介2.pycocotools安装的坑3.解决办法更多Ai资讯:公主号AiCharm本系列是作者在跑一些深度学习实例时,遇到的各种各样的问题及解决办法,希望能够帮助到大家。ERROR:Commanderroredoutwithexitstatus1:'D:\Anaconda3\python.exe'-u-c'importsys,setuptools,tokenize;sys.argv[0]='"'"'C:\\Users\\46653\\AppData\\Local\\Temp\\pip-instal
基础版云数据库RDS的产品系列包括基础版、高可用版、集群版、三节点企业版,本文介绍基础版实例的相关信息。RDS基础版实例也称为单机版实例,只有单个数据库节点,计算与存储分离,性价比超高。说明RDS基础版实例只有一个数据库节点,没有备节点作为热备份,因此当该节点意外宕机或者执行重启实例、变更配置、版本升级等任务时,会出现较长时间的不可用。如果业务对数据库的可用性要求较高,不建议使用基础版实例,可选择其他系列(如高可用版),部分基础版实例也支持升级为高可用版。基础版与高可用版的对比拓扑图如下所示。优势 性能由于不提供备节点,主节点不会因为实时的数据库复制而产生额外的性能开销,因此基础版的性能相对于
Ocra无法处理需要“tk”的应用程序require'tk'puts'nope'用奥克拉http://github.com/larsch/ocra不起作用(如链接中的一个问题所述)问题:https://github.com/larsch/ocra/issues/29(Ocra是1.9的"new"rubyscript2exe,本质上它用于将rb脚本部署为可执行文件)唯一的问题似乎是缺少tcl的DLL文件我不认为这是一个问题据我所知,问题是缺少tk的DLL文件如果它们是已知的,则可以在执行ocra时将它们包括在内有没有办法知道tk工作所需的DLL依赖项? 最佳答
我有一个类unzipper.rb,它使用Rubyzip解压文件。在我的本地环境中,我可以成功解压缩文件,而无需使用require'zip'明确包含依赖项但是在Heroku上,我得到一个NameError(uninitializedconstantUnzipper::Zip)我只能通过使用明确的require来解决问题:为什么这在Heroku环境中是必需的,但在本地主机上却不是?我的印象是Rails自动需要所有gem。app/services/unzipper.rbrequire'zip'#OnlyrequiredforHeroku.Workslocallywithout!class