草庐IT

Apache Flink——容错机制相关概念

小波同学 2023-09-25 原文

前言

这里把各种资料里认为和容错有关的概念放在一起来解释,这样或许能更好的理解Flink强大的容错机制。主要的概念有四个:Stage、Checkpoint、SavePoint、Barrier。

一、Flink容错

Apache Flink 提供了可以恢复数据流应用到一致状态的容错机制。确保在发生故障时,程序的每条记录只会作用于状态一次(exactly-once),当然也可以降级为至少一次(at-least-once)。

容错机制核心通过持续创建分布式数据流及其状态一致性的快照来实现。对于状态占用空间小的流应用,这些快照非常轻量,可以高频率创建而对性能影响很小。流计算应用的状态保存在一个可配置的环境,如:master 节点或者 HDFS上。

在遇到程序故障时(如机器、网络、软件等故障),Flink 停止分布式数据流。系统重启所有 operator ,重置其到最近成功的 checkpoint。输入重置到相应的状态快照位置。保证被重启的并行数据流中处理的任何一个 record 都不是 checkpoint 状态之前的一部分。

注意:为了容错机制生效,数据源(例如 消息队列 或者 broker)需要能重放数据流。Apache Kafka 有这个特性,Flink 中 Kafka 的 connector 利用了这个功能。由于 Flink 的 checkpoint 是通过分布式快照实现的,接下来我们将 snapshot 和 checkpoint 这两个词交替使用。

二、状态(State)

状态是为了解决“有过程”的计算场景下故障恢复用的。什么是“有过程”,如果在dataflow中,一次只计算一条数据,此时出现故障,只需要重新消费一次即可,但如果在一个窗口内,一次有很多数据需要计算,对于中间的计算结果保存下来,方便故障时进行“断点恢复”,那么这些操作就成为stageful(有状态的),后端存储(State Backend)默认RocksDB。

operator 包含任何形式的状态,这些状态都必须包含在快照中。状态有很多种形式:

  • 1、用户自定义状态:由 transformation 函数例如( map() 或者 filter())直接创建或者修改的状态。用户自定义状态可以是:转换函数中的 Java 对象的一个简单变量或者函数关联的 key/value 状态。

  • 2、系统状态:这种状态是指作为 operator 计算中一部分缓存数据。典型例子就是: 窗口缓存(window buffers),系统收集窗口对应数据到其中,直到窗口计算和发射。

operator 在收到所有输入数据流中的 barrier 之后,在发射 barrier 到其输出流之前对其状态进行快照。此时,在 barrier 之前的数据对状态的更新已经完成,不会再依赖 barrier 之前数据。由于快照可能非常大,所以后端存储系统可配置。默认是存储到 JobManager 的内存中,但是对于生产系统,需要配置成一个可靠的分布式存储系统(例如 HDFS)。状态存储完成后,operator 会确认其 checkpoint 完成,发射出 barrier 到后续输出流。快照现在包含了:

  • 1、对于并行输入数据源:快照创建时数据流中的位置偏移。

  • 2、对于 operator:存储在快照中的状态指针。

三、检查点(Checkpoint)

Flink使用stream replay和checkpointing来实现容错。Checkpoint通过对stream和operator都做快照(snapshot)来记录状态,这样才能够在保证在流处理系统失败时能够正确地恢复数据流处理。Checkpoint是Flink周期性自动做的,支持全量和增量。

四、保存点(Savepoint)

Savepoint和Checkpoint类似,是用来保存程序和Flink Cluster的State(状态),它和Checkpoint的主要区别有两点:

    1. 手动触发生成
    1. 不会自动过期

可以通过命令行或REST API的方式来创建Savepoint。

四、Barrier机制

Flink分布式快照的核心概念之一就是Barrier(数据栅栏)。这些Barrier被插入到数据流中,作为数据流的一部分和数据流一起向下流动。Barrier不会干扰正常的数据,数据流严格有序。一个Barrier把数据流分割成两部分:一部分进入到当前快照,另一部分进入到下一个快照。每个Barrier都带有快照ID,并且Barrier之前的数据都进入了此快照。Barrier不会干扰数据流处理,所以非常轻量。多个不同快照的多个Barrier会在流中同时出现,即多个快照可能会同时被创建。

Barrier在数据源读入的时候被插入,当snapshot n的Barrier插入后,系统会记录当前snapshot在数据源中的位置n(用Sn标识)。比如,在Kafka中,这个变量标识某个分区中的最后一条数据的偏移量(offset)。这个位置值Sn会被发送到一个称为checkpoint coordinator的模块(即Flink的JobManager)。

然后Barrier随着正常数据继续往下流动,当一个operator从其所有的输入流都接收到snapshot n的Barrier时,它会向其所有输出流插入一个标识(也叫snapshot n)的Barrier。当sink operator(即DAG流的终点)从其输入流接收到所有的Barrier n时,表示这一批数据处理完成,它会向checkpoint coordinator发送消息确认snapshot n已完成。当所有sink都确认了这个snapshot,则标识本次处理已成功,该snapshot被标识为已完成。

在上诉处理流程中,operator上游会接收很多流,每个流的快慢又不一致,如何保证每个snapshot都都能放在一起被输出呢?这就靠operator的“对齐(align)”功能。

上图解释了“align”的过程,核心思想就是“快流等慢流”,拿barrier n来说,上面的流到的早,此后operator就暂时不会继续处理后续的数据了(否则会导致snapshot n和snapshot n+1的数据混在一起了),而是会用“input buffer”把它对应的数据先保存起来,等下面流的barrier n也到来时,operator就把内部的所有数据都向下发出去,并对下游也插入一个barrier n来标识本次对齐完成。

基于Stream Aligning操作能够实现Exactly Once语义,但是也会给流处理应用带来延迟,因为为了排列对齐Barrier,会暂时缓存一部分Stream的记录到Buffer中,尤其是在数据流并行度很高的场景下可能更加明显,通常以最迟对齐Barrier的一个Stream为处理Buffer中缓存记录的时刻点。在Flink中,提供了一个开关,选择是否使用Stream Aligning,如果关掉则Exactly Once会变成At least once。

五、Recovery

在这种容错机制下的错误回复很明显:一旦遇到故障,Flink 选择最近一个完成的 checkpoint k。系统重新部署整个分布式数据流,重置所有 operator 的状态到 checkpoint k。数据源被置为从 Sk 位置读取。例如在 Apache Kafka 中,意味着让消费者从 Sk 处偏移开始读取。

如果是增量快照,operator 需要从最新的全量快照回复,然后对此状态进行一系列增量更新。

参考:
http://www.circleblog.net/?p=1230

http://t.zoukankan.com/love-yh-p-11386117.html

https://blog.51cto.com/u_15127688/4370359

有关Apache Flink——容错机制相关概念的更多相关文章

  1. ruby-on-rails - 相关表上的范围为 "WHERE ... LIKE" - 2

    我正在尝试从Postgresql表(table1)中获取数据,该表由另一个相关表(property)的字段(table2)过滤。在纯SQL中,我会这样编写查询:SELECT*FROMtable1JOINtable2USING(table2_id)WHEREtable2.propertyLIKE'query%'这工作正常:scope:my_scope,->(query){includes(:table2).where("table2.property":query)}但我真正需要的是使用LIKE运算符进行过滤,而不是严格相等。然而,这是行不通的:scope:my_scope,->(que

  2. ruby-on-rails - 在具有 ActiveRecord 条件的相关模型中按字段排序 - 2

    我正在尝试按Rails相关模型中的字段进行排序。我研究的所有解决方案都没有解决如果相关模型被另一个参数过滤?元素模型classItem相关模型:classPriority我正在使用where子句检索项目:@items=Item.where('company_id=?andapproved=?',@company.id,true).all我需要按相关表格中的“位置”列进行排序。问题在于,在优先级模型中,一个项目可能会被多家公司列出。因此,这些职位取决于他们拥有的company_id。当我显示项目时,它是针对一个公司的,按公司内的职位排序。完成此任务的正确方法是什么?感谢您的帮助。PS-我

  3. ruby - 使用指向 ruby​​ 可执行文件的符号链接(symbolic link)时查找相关库 - 2

    假设您有一个可执行文件foo.rb,其库bar.rb的布局如下:/bin/foo.rb/lib/bar.rb在foo.rb的header中放置以下要求以在bar.rb中引入功能:requireFile.dirname(__FILE__)+"../lib/bar.rb"只要对foo.rb的所有调用都是直接的,这就可以正常工作。如果你把$HOME/project和符号链接(symboliclink)foo.rb放入$HOME/usr/bin,然后__FILE__解析为$HOME/usr/bin/foo.rb,因此无法找到bar.rb关于foo.rb的目录名.我意识到像ruby​​gems这

  4. HarmonyOS原子化服务开发相关术语 - 2

    术语中文解释Ability原子化服务帮助用户完成任务的原子化服务,和用户的意图进行关联。Fulfillment服务履行通过图标,卡片,语音等形式呈现用户意图。开发者通过接口的方式,处理用户意图,返回内容。Intent意图用于表达用户想要达成的目标或完成的任务。HUAWEIAssistant智能助手“无微不智”的个人助手,通过不断的学习用户的使用习惯,不断的为用户提供贴心的精准的便捷的个性化服务。AISearch全局搜索用户可快速搜索关键词,与之匹配的原子化服务则会出现在搜索结果中。SmartService智慧服务用户订阅原子化服务,在到达特定触发条件(时间、地点、事件)后,卡片推送至用户智能助

  5. H2数据库配置及相关使用方式一站式介绍(极为详细并整理官方文档) - 2

    目录H2数据库入门以及实际开发时的使用1.H2数据库的初识1.1H2数据库介绍1.2为什么要使用嵌入式数据库?1.3嵌入式数据库对比1.3.1性能对比1.4技术选型思考2.H2数据库实战2.1H2数据库下载搭建以及部署2.1.1H2数据库的下载2.1.2数据库启动2.1.2.1windows系统可以在bin目录下执行h2.bat2.1.2.2同理可以通过cmd直接使用命令进行启动:2.1.2.3启动后控制台页面:2.1.3spring整合H2数据库2.1.3.1引入依赖文件2.1.4数据库通过file模式实际保存数据的位置2.2H2数据库操作2.2.1Mysql兼容模式2.2.2Mysql模式

  6. ruby-on-rails - 旧的 Ruby 错误在我的 Ruby on Rails 应用程序中反复出现,与 Class.create 和 delayed_job 相关 - 2

    这个错误已经有好几个月了,在这里:http://www.ruby-forum.com/topic/1094002其中显示代码更改的两个链接:https://github.com/godfat/ruby/commit/f4e0e8f781b05c767ad2472a43a4ed0727a75708https://github.com/godfat/ruby/commit/c7a6cf975d88828c2ed27d253f41c480f9b66ad6我有Ruby1.9.2和rvm。我会把这些更改粘贴到适当的文件中,但我不知道如何粘贴。这在几天前就奏效了。我不能像这样执行RubyonRai

  7. WebSocket的那些事(1-概念篇) - 2

    目录一、什么是Websocket二、WebSocket部分header介绍三、HTTPVSWebSocket四、什么时候使用WebSockets五、关于SockJS和STOMP一、什么是Websocket根据RFC6455标准,Websocket协议提供了一种标准化的方式在客户端和服务端之间通过TCP连接建立全双工、双向通信渠道。它是一种不同于HTTP的TCP协议,但是被设计为在HTTP基础上运行。Websocket交互始于HTTP请求,该请求会通过HTTPUpgrade请求头去升级请求,进而切换到Websocket协议。请求报文如下:GET/spring-websocket-portfoli

  8. 半个月狂飙1000亿,ChatGPT概念股凭什么? - 2

    ChatGPT掀起了AI股历史上最疯狂的一轮市值狂飙。自春节后至今,ChatGPT概念股开始了暴走模式,短短半月时间,海天瑞声、开普云等ChatGPT概念股市值累计增加了近1400亿。如此的爆炸效应,得益于ChatGPT所展现出商业化落地的巨大潜力。要知道,在此之前,无论是十年AI投入超千亿的百度,还是困在硬件化里的AI四小龙,都在重复着AI商业化难落地的故事。ChatGPT的出现,让AI从生产力的赋能者直接成为一种创造生产力的工具。随着订阅模式的推出,ChatGPT已经成为第一个以AI技术为核心直接变现的消费者应用。本文持有以下核心观点:1、ChatGPT是AI技术迭代的受益者。过去受限技术

  9. ruby - Ruby 是否提供响应 OS X 上的 Apple 事件的机制? - 2

    我正在使用Ruby-Tk为OSX开发一个桌面应用程序,我想为该应用程序提供一个AppleEvents接口(interface)。这意味着应用程序将定义它将响应的AppleScript命令的字典(对应于发送到应用程序的Apple事件),并且用户/其他应用程序可以使用AppleScript命令编写Ruby-Tk应用程序的脚本。其他脚本语言支持此类功能——Python通过位于http://appscript.svn.sourceforge.net/viewvc/appscript/py-aemreceive/的py-aemreceive库和Tcl通过位于http://tclae.source

  10. ruby-on-rails - Searchkick 结果不相关 - 2

    我对相关搜索有疑问。以下请求的结果很奇怪:Candidate.search('martin',fields:[:first_name,:last_name],match::word_start,misspellings:false).map(&:name)["KautzerMartina","FunkMartin","JaskolskiMartin","GutmannMartine","WiegandMartina","SchuellerMartin","DooleyMartin","StiedemannMartine","BartellMartina","GerlachMartine

随机推荐