关注公众号:大数据技术派,回复“资料”,领取1000G资料。
本文首发于我的个人博客:Flink状态管理
Flink 提供了以下数据格式来管理和存储键控状态 (Keyed State):
public class ListStateDemo extends RichFlatMapFunction<Tuple2<String, Long>,List<Tuple2<String, Long>>>{
private transient ListState<Tuple2<String, Long>> listState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ListStateDescriptor<Tuple2<String, Long>> listStateDescriptor = new ListStateDescriptor(
"listState",
TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {})
);
listState = getRuntimeContext().getListState(listStateDescriptor);
}
@Override
public void flatMap(Tuple2<String, Long> value, Collector<List<Tuple2<String, Long>>> out) throws Exception {
List<Tuple2<String, Long>> currentListState = Lists.newArrayList(listState.get().iterator());
currentListState.add(value);
listState.update(currentListState);
out.collect(currentListState);
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Long>> dataStream = senv.fromElements(
Tuple2.of("a", 50L),Tuple2.of("a", 60L),Tuple2.of("a", 70L),
Tuple2.of("b", 50L),Tuple2.of("b", 60L),Tuple2.of("b", 70L),
Tuple2.of("c", 50L),Tuple2.of("c", 60L),Tuple2.of("c", 70L)
);
dataStream
.keyBy(0)
.flatMap(new ListStateDemo())
.print();
senv.execute(ListStateDemo.class.getSimpleName());
}
}
Flink为算子状态提供三种基本数据结构:
public class OperateStateDemo extends RichFlatMapFunction<Tuple2<String, Long>, List<Tuple2<String, Long>>>
implements CheckpointedFunction{
private final int threshold;
private transient ListState<Tuple2<String, Long>> checkpointedState;
private List<Tuple2<String, Long>> bufferedElements;
public OperateStateDemo(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void flatMap(Tuple2<String, Long> value, Collector<List<Tuple2<String, Long>>> out) throws Exception {
bufferedElements.add(value);
if(bufferedElements.size() == threshold) {
out.collect(bufferedElements);
bufferedElements.clear();
}
}
/**
* 进行checkpoint快照
* @param context
* @throws Exception
*/
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
for(Tuple2<String, Long> element : bufferedElements) {
checkpointedState.add(element);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Tuple2<String, Long>> listStateDescriptor = new ListStateDescriptor(
"listState",
TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {})
);
checkpointedState = context.getOperatorStateStore().getListState(listStateDescriptor);
// 如果是故障恢复
if(context.isRestored()) {
for(Tuple2<String, Long> element : checkpointedState.get()) {
bufferedElements.add(element);
}
checkpointedState.clear();
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
senv.getCheckpointConfig().setCheckpointInterval(500);
DataStream<Tuple2<String, Long>> dataStream = senv.fromElements(
Tuple2.of("a", 50L),Tuple2.of("a", 60L),Tuple2.of("a", 70L),
Tuple2.of("b", 50L),Tuple2.of("b", 60L),Tuple2.of("b", 70L),
Tuple2.of("c", 50L),Tuple2.of("c", 60L),Tuple2.of("c", 70L)
);
dataStream
.flatMap(new OperateStateDemo(2))
.print();
senv.execute(OperateStateDemo.class.getSimpleName());
}
}
对于Keyed State和Operator State这两种状态,他们的横向伸缩机制不太相同。由于每个Keyed State总是与某个Key相对应,当横向伸缩时,Key总会被自动分配到某个算子子任务上,因此Keyed State会自动在多个并行子任务之间迁移。对于一个非KeyedStream,流入算子子任务的数据可能会随着并行度的改变而改变。如上图所示,假如一个应用的并行度原来为2,那么数据会被分成两份并行地流入两个算子子任务,每个算子子任务有一份自己的状态,当并行度改为3时,数据流被拆成3支,或者并行度改为1,数据流合并为1支,此时状态的存储也相应发生了变化。对于横向伸缩问题,Operator State有两种状态分配方式:一种是均匀分配,另一种是将所有状态合并,再分发给每个实例上。
Checkpoint指定触发生成时间间隔后,每当需要触发Checkpoint时,会向Flink程序运行时的多个分布式的Stream Source中插入一个Barrier标记,这些Barrier会根据Stream中的数据记录一起流向下游的各个Operator。当一个Operator接收到一个Barrier时,它会暂停处理Steam中新接收到的数据记录。因为一个Operator可能存在多个输入的Stream,而每个Stream中都会存在对应的Barrier,该Operator要等到所有的输入Stream中的Barrier都到达。当所有Stream中的Barrier都已经到达该Operator,这时所有的Barrier在时间上看来是同一个时刻点(表示已经对齐),在等待所有Barrier到达的过程中,Operator的Buffer中可能已经缓存了一些比Barrier早到达Operator的数据记录(Outgoing Records),这时该Operator会将数据记录(Outgoing Records)发射(Emit)出去,作为下游Operator的输入,最后将Barrier对应Snapshot发射(Emit)出去作为此次Checkpoint的结果数据。
Checkpoint 其他的属性包括:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(1000);
// 高级选项:
// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 开启在 job 中止后仍然保留的 externalized checkpoints
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 允许在有更近 savepoint 时回退到 checkpoint
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
保存多个Checkpoint
默认情况下,如果设置了Checkpoint选项,则Flink只保留最近成功生成的1个Checkpoint,而当Flink程序失败时,可以从最近的这个Checkpoint来进行恢复。但是,如果我们希望保留多个Checkpoint,并能够根据实际需要选择其中一个进行恢复,这样会更加灵活,比如,我们发现最近4个小时数据记录处理有问题,希望将整个状态还原到4小时之前。
Flink可以支持保留多个Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的个数:
state.checkpoints.num-retained: 20
保留了最近的20个Checkpoint。如果希望会退到某个Checkpoint点,只需要指定对应的某个Checkpoint路径即可实现。
从Checkpoint进行恢复
从指定的checkpoint处启动,最近的一个/flink/checkpoints/workFlowCheckpoint/339439e2a3d89ead4d71ae3816615281/chk-1740584启动,通常需要先停掉当前运行的flink-session,然后通过命令启动:
../bin/flink run -p 10 -s /flink/checkpoints/workFlowCheckpoint/339439e2a3d89ead4d71ae3816615281/chk-1740584/_metadata -c com.code2144.helper_wink-1.0-SNAPSHOT.jar
可以把命令放到脚本里面,每次直接执行checkpoint恢复脚本即可:
# 触发指定id的作业的Savepoint,并将结果存储到指定目录下
bin/flink savepoint :jobId [:targetDirectory]
手动savepoint
/app/local/flink-1.6.2/bin/flink savepoint 0409251eaff826ef2dd775b6a2d5e219 [hdfs://bigdata/path]
成功触发savepoint通常会提示:Savepoint completed. Path: hdfs://path...:
手动取消任务
与checkpoint异常停止或者手动Kill掉不一样,对于savepoint通常是我们想要手动停止任务,然后更新代码,可以使用flink cancel ...命令:
/app/local/flink-1.6.2/bin/flink cancel 0409251eaff826ef2dd775b6a2d5e219
从指定savepoint启动job
bin/flink run -p 8 -s hdfs:///flink/savepoints/savepoint-567452-9e3587e55980 -c com.code2144.helper_workflow.HelperWorkFlowStreaming jars/BSS-ONSS-Flink-1.0-SNAPSHOT.jar
// 配置 FsStateBackend
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
// 配置 RocksDBStateBackend
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));
配置 RocksDBStateBackend 时,需要额外导入下面的依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.9.0</version>
</dependency>
第二种方式:基于 flink-conf.yaml 配置文件的方式进行配置,对所有部署在该集群上的作业都生效:
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
EXACTLY_ONCE语义简称EOS,指的是每条输入消息只会影响最终结果一次,注意这里是影响一次,而非处理一次,Flink一直宣称自己支持EOS,实际上主要是对于Flink应用内部来说的,对于外部系统(端到端)则有比较强的限制
At-Least-Once或At-Most-Once语义,从0.11版本开始,引入了幂等发送和事务,从而开始保证EXACTLY_ONCE语义。
| Maven依赖 | 开始支持的版本 | 生产/消费 类名 | kafka版本 | 注意 |
|---|---|---|---|---|
| flink-connector-kafka-0.8_2.11 | 1.0.0 | FlinkKafkaConsumer08<br>FlinkKafkaProducer08 | 0.8.x | 使用Kafka内部SimpleConsumer API. Flink把Offsets提交到ZK |
| flink-connector-kafka-0.9_2.11 | 1.0.0 | FlinkKafkaConsumer09<br>FlinkKafkaProducer09 | 0.9.x | 使用新版Kafka Consumer API. |
| flink-connector-kafka-0.10_2.11 | 1.2.0 | FlinkKafkaConsumer010<br>FlinkKafkaProducer010 | 0.10.x | 支持Kafka生产/消费消息带时间戳 |
| flink-connector-kafka-0.11_2.11 | 1.4.0 | FlinkKafkaConsumer011<br>FlinkKafkaProducer011 | 0.11.x | 由于0.11.x Kafka不支持scala 2.10。此连接器支持Kafka事务消息传递,以便为生产者提供exactly once语义。 |
flink-connector-kafka_2.11 |
1.7.0 | FlinkKafkaConsumer<br>FlinkKafkaProducer | >=1.0.0 | 高版本向后兼容。但是,对于Kafka 0.11.x和0.10.x版本,我们建议分别使用专用的flink-connector-Kafka-0.11_2.11和link-connector-Kafka-0.10_2.11 |
public class FlinkKafkaDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
senv.enableCheckpointing(1000);
senv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// kafka 数据源
Map<String, String> config = Configuration.initConfig("commons.xml");
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", config.get("kafka-ipport"));
kafkaProps.setProperty("group.id", config.get("kafka-groupid"));
SingleOutputStreamOperator<String> dataStream = senv.addSource(
new FlinkKafkaConsumer011(
config.get("kafka-topic"),
new SimpleStringSchema(),
kafkaProps
));
// sink 到 kafka
FlinkKafkaProducer011<String> producer011 = new FlinkKafkaProducer011<>(
config.get("kafka-ipport"),
"test-kafka-producer",
new SimpleStringSchema());
producer011.setWriteTimestampToKafka(true);
dataStream.map(x -> {
// 抛出异常
if("name4".equals(JSON.parseObject(x).get("name"))){
System.out.println("name4 exception test...");
// throw new RuntimeException("name4 exception test...");
}
return x;
}).addSink(producer011);
senv.execute(FlinkKafkaDemo.class.getSimpleName());
}
}
Flink由JobManager协调各个TaskManager进行checkpoint存储,checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存。
当 checkpoint 启动时,JobManager 会将检查点分界线(barrier)注入数据流;barrier会在算子间传递下去。
每个算子会对当前的状态做个快照,保存到状态后端。对于source任务而言,就会把当前的offset作为状态保存起来。下次从checkpoint恢复时,source任务可以重新提交偏移量,从上次保存的位置开始重新消费数据。
每个内部的 transform 任务遇到 barrier 时,都会把状态存到 checkpoint 里。
sink 任务首先把数据写入外部 kafka,这些数据都属于预提交的事务(还不能被消费);当遇到 barrier时,把状态保存到状态后端,并开启新的预提交事务。
当所有算子任务的快照完成,也就是这次的 checkpoint 完成时,JobManager 会向所有任务发通知,确认这次 checkpoint 完成。当sink 任务收到确认通知,就会正式提交之前的事务,kafka 中未确认的数据就改为“已确认”,数据就真正可以被消费了。
所以看到,执行过程实际上是一个两段式提交,每个算子执行完成,会进行“预提交”,直到执行完sink操作,会发起“确认提交”,如果执行失败,预提交会放弃掉。
具体的两阶段提交步骤总结如下:
At-Least-Once或At-Most-Once语义,从0.11版本开始,引入了幂等发送和事务,从而开始保证EXACTLY_ONCE语义。
为了实现Producer的幂等语义,Kafka引入了Producer ID(即PID)和Sequence Number。每个新的Producer在初始化的时候会被分配一个唯一的PID,该PID对用户完全透明而不会暴露给用户。
Producer发送每条消息<Topic, Partition>对于Sequence Number会从0开始单调递增,broker端会为每个<PID, Topic, Partition>维护一个序号,每次commit一条消息此序号加一,对于接收的每条消息,如果其序号比Broker维护的序号(即最后一次Commit的消息的序号)大1以上,则Broker会接受它,否则将其丢弃:
Kafka事务原理进行了详细介绍,完整的流程图如下:

我正在使用i18n从头开始构建一个多语言网络应用程序,虽然我自己可以处理一大堆yml文件,但我说的语言(非常)有限,最终我想寻求外部帮助帮助。我想知道这里是否有人在使用UI插件/gem(与django上的django-rosetta不同)来处理多个翻译器,其中一些翻译器不愿意或无法处理存储库中的100多个文件,处理语言数据。谢谢&问候,安德拉斯(如果您已经在rubyonrails-talk上遇到了这个问题,我们深表歉意) 最佳答案 有一个rails3branchofthetolkgem在github上。您可以通过在Gemfi
我需要在客户计算机上运行Ruby应用程序。通常需要几天才能完成(复制大备份文件)。问题是如果启用sleep,它会中断应用程序。否则,计算机将持续运行数周,直到我下次访问为止。有什么方法可以防止执行期间休眠并让Windows在执行后休眠吗?欢迎任何疯狂的想法;-) 最佳答案 Here建议使用SetThreadExecutionStateWinAPI函数,使应用程序能够通知系统它正在使用中,从而防止系统在应用程序运行时进入休眠状态或关闭显示。像这样的东西:require'Win32API'ES_AWAYMODE_REQUIRED=0x0
当我的预订模型通过rake任务在状态机上转换时,我试图找出如何跳过对ActiveRecord对象的特定实例的验证。我想在reservation.close时跳过所有验证!叫做。希望调用reservation.close!(:validate=>false)之类的东西。仅供引用,我们正在使用https://github.com/pluginaweek/state_machine用于状态机。这是我的预订模型的示例。classReservation["requested","negotiating","approved"])}state_machine:initial=>'requested
我安装了ruby版本管理器,并将RVM安装的ruby实现设置为默认值,这样'哪个ruby'显示'~/.rvm/ruby-1.8.6-p383/bin/ruby'但是当我在emacs中打开inf-ruby缓冲区时,它使用安装在/usr/bin中的ruby。有没有办法让emacs像shell一样尊重ruby的路径?谢谢! 最佳答案 我创建了一个emacs扩展来将rvm集成到emacs中。如果您有兴趣,可以在这里获取:http://github.com/senny/rvm.el
对于作为String#tr参数的单引号字符串文字中反斜杠的转义状态,我觉得有些神秘。你能解释一下下面三个例子之间的对比吗?我特别不明白第二个。为了避免复杂化,我在这里使用了'd',在双引号中转义时不会改变含义("\d"="d")。'\\'.tr('\\','x')#=>"x"'\\'.tr('\\d','x')#=>"\\"'\\'.tr('\\\d','x')#=>"x" 最佳答案 在tr中转义tr的第一个参数非常类似于正则表达式中的括号字符分组。您可以在表达式的开头使用^来否定匹配(替换任何不匹配的内容)并使用例如a-f来匹配一
我目前正在使用以下方法获取页面的源代码:Net::HTTP.get(URI.parse(page.url))我还想获取HTTP状态,而无需发出第二个请求。有没有办法用另一种方法做到这一点?我一直在查看文档,但似乎找不到我要找的东西。 最佳答案 在我看来,除非您需要一些真正的低级访问或控制,否则最好使用Ruby的内置Open::URI模块:require'open-uri'io=open('http://www.example.org/')#=>#body=io.read[0,50]#=>"["200","OK"]io.base_ur
是否有简单的方法来更改默认ISO格式(yyyy-mm-dd)的ActiveAdmin日期过滤器显示格式? 最佳答案 您可以像这样为日期选择器提供额外的选项,而不是覆盖js:=f.input:my_date,as::datepicker,datepicker_options:{dateFormat:"mm/dd/yy"} 关于ruby-on-rails-事件管理员日期过滤器日期格式自定义,我们在StackOverflow上找到一个类似的问题: https://s
在VMware16.2.4安装Ubuntu一、安装VMware1.打开VMwareWorkstationPro官网,点击即可进入。2.进入后向下滑动找到Workstation16ProforWindows,点击立即下载。3.下载完成,文件大小615MB,如下图:4.鼠标右击,以管理员身份运行。5.点击下一步6.勾选条款,点击下一步7.先勾选,再点击下一步8.去掉勾选,点击下一步9.点击下一步10.点击安装11.点击许可证12.在百度上搜索VM16许可证,复制填入,然后点击输入即可,亲测有效。13.点击完成14.重启系统,点击是15.双击VMwareWorkstationPro图标,进入虚拟机主
两个gsub产生不同的结果。谁能解释一下为什么?代码也可在https://gist.github.com/franklsf95/6c0f8938f28706b5644d获得.ver=9999str="\tCFBundleDevelopmentRegion\n\ten\n\tCFBundleVersion\n\t0.1.190\n\tAppID\n\t000000000000000"putsstr.gsub/(CFBundleVersion\n\t.*\.).*()/,"#{$1}#{ver}#{$2}"puts'--------'putsstr.gsub/(CFBundleVersio
我想为我的Task模型创建一个status属性,该属性将按以下顺序指示它在三部分进度中的位置:打开=>进行中=>完成。它的工作方式类似于亚马逊包裹的交付方式:已订购=>已发货=>已交付。我想知道设置此属性的最佳方法是什么。我可能是错的,但创建三个独立的bool属性似乎有点多余。实现此目标的最佳方法是什么? 最佳答案 Rails4有一个内置的enummacro.它使用单个整数列并映射到键列表。classOrderenumstatus:[:ordered,:shipped,:delivered]end状态映射如下:{ordered:0,