草庐IT

Apache Flink——一些重要的概念

小波同学 2023-09-17 原文

一、数据流图(Dataflow Graph)

所有的 Flink 程序都可以归纳为由三部分构成:Source、Transformation 和 Sink。

  • Source 表示“源算子”,负责读取数据源。
  • Transformation 表示“转换算子”,利用各种算子进行处理加工。
  • Sink 表示“下沉算子”,负责数据的输出。

Flink 程序会被映射成所有算子按照逻辑顺序连接在一起的一张图,这被称为“逻辑数据流”(logical dataflow),或者叫“数据流图”(dataflow graph)。我们提交作业之后,打开 Flink 自带的 Web UI,点击作业就能看到对应的 dataflow。

在运行时,Flink 上运行的程序会被映射成“逻辑数据流”(dataflows),它包含了这三部分。每一个 dataflow 以一个或多个 sources 开始以一个或多个 sinks 结束。dataflow 类似于任意的有向无环图(DAG)。在大部分情况下,程序中的转换运算(transformations)跟 dataflow 中的算子(operator)是 一 一 对应的关系,但有时候,一个 transformation 可能对应多个 operator。

二、并行度(Parallelism)

如果说 Spark基于 MapReduce 架构的思想是“数据不动代码动”,那么 Flink 就类似“代码不动数据流动”,原因就在于流式数据本身是连续到来的、我们不会同时传输所有数据,这其实是更符合数据流本身特点的处理方式。

一个特定算子的子任务(subtask)的个数被称之为其并行度。 这样,包含并行子任务的数据流,就是并行数据流,它需要多个分区(stream partition)来分配并行任务。一般情况下,一个流程序的并行度,可以认为就是其所有算子中最大的并行度。一个程序中,不同的算子可能具有不同的并行度。

一个并行的任务,需要占用的slot数是整个任务最大并行的数量,也就是你设置的Parallelism在整个处理流程重最大的数量。

并行度优先级:代码层面 > 全局层面 > Web UI 层面 > 集群配置文件层面

在不设置并行度的情况下,并行度的数量取决于CPU有多少核心

三、算子链(Operator Chain)

数据传输的方式:

3.1 一对一(One-to-one,forwarding)

这种模式下,数据流维护着分区以及元素的顺序。比如图中的 source 和 map 算子,source算子读取数据之后,可以直接发送给 map 算子做处理,它们之间不需要重新分区,也不需要调整数据的顺序。这就意味着 map 算子的子任务,看到的元素个数和顺序跟 source 算子的子任务产生的完全一样,保证着“一对一”的关系。map、filter、flatMap 等算子都是这种 one-to-one的对应关系。

这种关系类似于 Spark 中的窄依赖。

3.2 重分区(Redistributing)

在这种模式下,数据流的分区会发生改变。
每一个算子的子任务,会根据数据传输的策略,把数据发送到不同的下游目标任务。例如,keyBy()是分组操作,本质上基于键(key)的哈希值(hashCode)进行了重分区;而当并行度改变时,比如从并行度为 2 的 window 算子,要传递到并行度为 1 的 Sink 算子,这时的数据传输方式是再平衡(rebalance),会把数据均匀地向下游子任务分发出去。这些传输方式都会引起重分区(redistribute)的过程,这一过程类似于 Spark 中的 shuffle。

总体说来,这种算子间的关系类似于 Spark 中的宽依赖。

在 Flink 中,并行度相同的一对一(one to one)算子操作,可以直接链接在一起形成一个“大”的任务(task),这样原来的算子就成为了真正任务里的一部分,如图 所示。每个 task会被一个线程执行。这样的技术被称为“算子链”(Operator Chain)。

Flink 为什么要有算子链这样一个设计呢?
这是因为将算子链接成 task 是非常有效的优化:可以减少线程之间的切换和基于缓存区的数据交换,在减少时延的同时提升吞吐量。

Flink 默认会按照算子链的原则进行链接合并,如果我们想要禁止合并或者自行定义,也可以在代码中对算子做一些特定的设置:

// 禁用算子链
.map(word -> Tuple2.of(word, 1L)).disableChaining();

// 从当前算子开始新链
.map(word -> Tuple2.of(word, 1L)).startNewChain()

四、作业图(JobGraph)与执行图(ExecutionGraph)

逻辑流图也就是数据流图

Flink 中任务调度执行的图,按照生成顺序可以分成四层:
逻辑流图(StreamGraph)→ 作业图(JobGraph)→ 执行图(ExecutionGraph)→ 物理图(Physical Graph)

4.1 逻辑流图(StreamGraph)

这是根据用户通过 DataStream API 编写的代码生成的最初的 DAG 图,用来表示程序的拓扑结构。这一步一般在客户端完成。

我们可以看到,逻辑流图中的节点,完全对应着代码中的四步算子操作:
源算子 Source(socketTextStream())→扁平映射算子 Flat Map(flatMap()) →分组聚合算子Keyed Aggregation(keyBy/sum()) →输出算子 Sink(print())。

4.2 作业图(JobGraph)

StreamGraph 经过优化后生成的就是作业图(JobGraph),这是提交给 JobManager 的数据结构,确定了当前作业中所有任务的划分。主要的优化为: 将多个符合条件的节点链接在一起合并成一个任务节点,形成算子链,这样可以减少数据交换的消耗。JobGraph 一般也是在客户端生成的,在作业提交时传递给 JobMaster。

分组聚合算子(Keyed Aggregation)和输出算子 Sink(print)并行度都为 2,而且是一对一的关系,满足算子链的要求,所以会合并在一起,成为一个任务节点。

4.3 执行图(ExecutionGraph)

JobMaster 收到 JobGraph 后,会根据它来生成执行图(ExecutionGraph)。ExecutionGraph是 JobGraph 的并行化版本,是调度层最核心的数据结构。与 JobGraph 最大的区别就是按照并行度对并行子任务进行了拆分,并明确了任务间数据传输的方式。

4.4 物理图(Physical Graph)

JobMaster 生成执行图后, 会将它分发给 TaskManager;各个 TaskManager 会根据执行图部署任务,最终的物理执行过程也会形成一张“图”,一般就叫作物理图(Physical Graph)。这只是具体执行层面的图,并不是一个具体的数据结构。物理图主要就是在执行图的基础上,进一步确定数据存放的位置和收发的具体方式。有了物理图,TaskManager 就可以对传递来的数据进行处理计算了。

所以我们可以看到,程序里定义了四个算子操作:源(Source)->转换(flatMap)->分组聚合(keyBy/sum)->输出(print);合并算子链进行优化之后,就只有三个任务节点了;再考虑并行度后,一共有 5 个并行子任务,最终需要 5 个线程来执行。

参考:
https://blog.csdn.net/weixin_45417821/article/details/124101562

有关Apache Flink——一些重要的概念的更多相关文章

  1. ruby-on-rails - 如何生成传递一些自定义参数的 `link_to` URL? - 2

    我正在使用RubyonRails3.0.9,我想生成一个传递一些自定义参数的link_toURL。也就是说,有一个articles_path(www.my_web_site_name.com/articles)我想生成如下内容:link_to'Samplelinktitle',...#HereIshouldimplementthecode#=>'http://www.my_web_site_name.com/articles?param1=value1¶m2=value2&...我如何编写link_to语句“alàRubyonRailsWay”以实现该目的?如果我想通过传递一些

  2. ruby - 找一些句子 - 2

    我想找到在某些文本中找到一些(让它是两个)句子的好方法。什么会更好-使用正则表达式或拆分方法?你的想法?应JeremyStein的要求-有一些例子示例:输入:ThefirstthingtodoistocreatetheCommentmodel.We’llcreatethisinthenormalway,butwithonesmalldifference.IfwewerejustcreatingcommentsforanArticlewe’dhaveanintegerfieldcalledarticle_idinthemodeltostoretheforeignkey,butinthis

  3. ruby block 并从 block 中返回一些东西 - 2

    我正在使用ruby​​1.8.7。p=lambda{return10;}deflab(block)puts'before'putsblock.callputs'after'endlabp以上代码输出为before10after我将相同的代码重构到这里deflab(&block)puts'before'putsblock.callputs'after'endlab{return10;}现在我收到LocalJumpError:意外返回。对我来说,这两个代码都在做同样的事情。是的,在第一种情况下我传递了一个过程,在第二种情况下我传递了一个block。但是&block将该block转换为pro

  4. ruby - 如果键存在,向散列值添加一些东西? - 2

    我在Ruby中有一个哈希:hash=Hash.new里面有一些键值对,比如说:hash[1]="One"hash[2]="Two"如果散列包含键2,那么我想将“Bananas”添加到它的值中。如果散列没有键2,我想创建一个新的键值对2=>"Bananas"。我知道我可以通过首先使用has_key?检查散列是否具有key2来做到这一点,然后采取相应的行动。但这需要一个if语句和不止一行。那么是否有一种简单、优雅的单行代码可以实现这一目标? 最佳答案 这个有效:hash[2]=(hash[2]||'')+'Bananas'如果您希望所有

  5. ruby - Camping 和 Sinatra 之间有什么重要区别吗? - 2

    我的感觉是Camping和Sinatra之间的差异不是很大,您可以安全地选择其中任何一个并且没问题。但我想问问Ruby专家,这是不是真的。Sinatra和Camping微框架之间实际上有什么重要区别吗?您将如何决定使用哪一个? 最佳答案 我知道的唯一显着区别是Camping像Rails一样基于MVC模式,并且与ActiveRecord耦合。Sinatra更加不可知。Camping也不再维护,而Sinatra正在积极开发中。仅这一点就足以让我们先看看Sinatra。编辑:感谢Philippe的更正,很高兴听到Camping的开发正在进

  6. ruby - =~ 运算符的顺序重要吗? - 2

    下面两个语句除了编码风格有区别吗?/regex/=~"some_string_with_regex""some_string_with_regex"=~/regex/ 最佳答案 是的,有区别。正如在http://www.ruby-doc.org/core/classes/Regexp.html#M001232中提到的If=~isusedwitharegexpliteralwithnamedcaptures,capturedstrings(ornil)isassignedtolocalvariablesnamedbythecaptur

  7. ruby - 使用 gmail gem 跟踪一些电子邮件 - 2

    我正在使用gmailgem发送电子邮件,我需要跟踪这些电子邮件。我该怎么做?我正在尝试搜索带有message_id的电子邮件,但它会从我的收件箱中提取所有电子邮件,而我只想要特定电子邮件的回复。这是我的实际代码:*使用message_id保存电子邮件*mail=gmail.deliver(email)Email.create(:message_id=>mail.message_id,:from=>user.email,:to=>annotation.to,:body=>annotation.content,:title=>annotation.title,:annotation=>an

  8. ruby - 一些简单的 Ruby 问题——迭代器、 block 和符号 - 2

    我的背景是PHP和C#,但我真的很想学习RoR。为此,我开始阅读官方文档。我对一些代码示例有一些疑问。第一个是迭代器:classArraydefinject(n)each{|value|n=yield(n,value)}nenddefsuminject(0){|n,value|n+value}enddefproductinject(1){|n,value|n*value}endend我理解yield的意思是“在这里执行关联的block”。令我震惊的是|value|n=each的一部分。其他block对我来说更有意义,因为它们似乎模仿C#风格的lambda:publicintsum(in

  9. WebSocket的那些事(1-概念篇) - 2

    目录一、什么是Websocket二、WebSocket部分header介绍三、HTTPVSWebSocket四、什么时候使用WebSockets五、关于SockJS和STOMP一、什么是Websocket根据RFC6455标准,Websocket协议提供了一种标准化的方式在客户端和服务端之间通过TCP连接建立全双工、双向通信渠道。它是一种不同于HTTP的TCP协议,但是被设计为在HTTP基础上运行。Websocket交互始于HTTP请求,该请求会通过HTTPUpgrade请求头去升级请求,进而切换到Websocket协议。请求报文如下:GET/spring-websocket-portfoli

  10. 半个月狂飙1000亿,ChatGPT概念股凭什么? - 2

    ChatGPT掀起了AI股历史上最疯狂的一轮市值狂飙。自春节后至今,ChatGPT概念股开始了暴走模式,短短半月时间,海天瑞声、开普云等ChatGPT概念股市值累计增加了近1400亿。如此的爆炸效应,得益于ChatGPT所展现出商业化落地的巨大潜力。要知道,在此之前,无论是十年AI投入超千亿的百度,还是困在硬件化里的AI四小龙,都在重复着AI商业化难落地的故事。ChatGPT的出现,让AI从生产力的赋能者直接成为一种创造生产力的工具。随着订阅模式的推出,ChatGPT已经成为第一个以AI技术为核心直接变现的消费者应用。本文持有以下核心观点:1、ChatGPT是AI技术迭代的受益者。过去受限技术

随机推荐