文章目录
按照数据的划分和扩张方式,Flink中大致分为2类:
因为一个任务的并行度有多少,就会有多少个子任务,当key的范围大于并行度时,就会出现一个subTask上可能包含多个Key(),但不同Task上不会出现相同的Key(解决了shuffle的问题?)
常用的 MapState、ValueState。
Keyed State 和 Operator State 存在两种形式:managed (托管状态)和 raw(原始状态)。
- 托管状态是由Flink框架管理的状态,原始状态是由用户自行管理状态的具体数据结构。
- 通常所有的 datastream functions 都可以使用托管状态,但是原始状态接口仅仅能够在实现 operators的时候使用。
- 推荐使用 managed state 而不是使用 raw state,因为使用托管状态的时候 Flink 可以在 parallelism 发生改变的情况下能够动态重新分配状态,而且还能更好的进行内存管理。
没有状态的操作
从概念上讲, 源表从来不会在状态中被完全保存。 形如 SELECT … FROM … WHERE
这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。
诸如 join、 聚合或去重操作需要在 Flink 抽象的容错存储内保存中间结果。看下sum的状态操作
@Internal
public class StreamGroupedReduceOperator<IN>
extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
implements OneInputStreamOperator<IN, IN> {
private static final long serialVersionUID = 1L;
private static final String STATE_NAME = "_op_state";
private transient ValueState<IN> values;
private final TypeSerializer<IN> serializer;
public StreamGroupedReduceOperator(ReduceFunction<IN> reducer, TypeSerializer<IN> serializer) {
super(reducer);
this.serializer = serializer;
}
@Override
public void open() throws Exception {
super.open();
ValueStateDescriptor<IN> stateId = new ValueStateDescriptor<>(STATE_NAME, serializer);
//获得value state
values = getPartitionedState(stateId);
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
IN value = element.getValue();
IN currentValue = values.value();
//如果currentValue不为null,则说明不是第一次启动,也就是在hdfs上已经存储了中间状态
if (currentValue != null) {
//先做一个聚合,然后再更新,之后输出到下游
IN reduced = userFunction.reduce(currentValue, value);
values.update(reduced);
output.collect(element.replace(reduced));
} else {
//第一次启动直接更新数据,之后输出到下游
values.update(value);
output.collect(element.replace(value));
}
}
}
从 Flink1.10 开始,Flink 默认将 state 内存大小配置为每个 task slot 的托管内存。
调试内存性能的问题主要是通过调整配置项,来提高Flink的托管内存:
taskmanager.memory.managed.size
//推荐使用比例计算
taskmanager.memory.managed.fraction
具体调优案例分析可见:Flink on yarn双流join问题分析+性能调优思路
Flink状态后端主要负责两件事:本地的状态管理、将检查点(checkpoint)状态写入远程存储。
flink state可以存储在java堆内存内或者内存之外。
默认情况下,使用MemoryStateBackend,Flink的state会保存在taskManager的内存中,而checkpoint会保存在jobManager的内存中。
flink提供三种开箱即用的State Backend:
| 状态后端 | 数据存储 | 容量限制 | 场景 |
|---|---|---|---|
| MemoryStateBackend |
|
|
|
| FsStateBackend |
|
|
|
| RocksDBStateBackend |
|
|
|
Keyed States 和 Operator States 会存储在一个带有编号的 chk 目录中,比如说一个 flink 任务的 Keyed States 的 subTask 个数是4,Operator States 对应的 subTask 也是 4,那么 chk 会存一个元数据文件 _metadata ,四个 Keyed States 文件,四个 Operator States 的文件。
也就是说 Keyed States 和 Operator States 会分别存储 subTask 总数个状态文件。
一般需求,我们的 Checkpoint 时间间隔可以设置为分钟级别(1-5 分钟)。
对于状态很大的任务每次 Checkpoint 访问 HDFS 比较耗时,可以设置为 5~10 分钟一次Checkpoint,并且调大两次 Checkpoint 之间的暂停间隔,例如设置两次 Checkpoint 之 间至少暂停 4 或 8 分钟。
具体案例分析可见:Flink on yarn双流join问题分析+性能调优思路
如果 Checkpoint 语义配置为 EXACTLY_ONCE,那么在 Checkpoint 过程中还会存在 barrier 对齐的过程,可以通过 Flink Web UI 的 Checkpoint 选项卡来查看 Checkpoint 过程中各阶段的耗时情况,从而确定到底是哪个阶段导致 Checkpoint 时间过长然后针对性的解决问题。
使用 flink 进行实时计算中,会遇到一些状态数不断累积,导致状态量越来越大的情形。例如,作业中定义了超长的时间窗口,或者在动态表上应用了无限范围的 GROUP BY 语句,以及执行了没有时间窗口限制的双流 JOIN 等等操作。
对于这些情况,经常导致堆内存出现 OOM,或者堆外内存(RocksDB)用量持续增长导致超出容器的配额上限,造成作业的频繁崩溃。
从 Flink 1.6 版本开始引入了 State TTL 特性,该特性可以允许对作业中定义的 Keyed 状态进行超时自动清理,对于Table API 和 SQL 模块引入了空闲状态保留时间(Idle State Retention Time)进行状态管理。
要使用 State TTL 功能,首先要定义一个 StateTtlConfig 对象。State TTL功能所指定的过期时间并不是全局生效的,而是和某个具体的算子状态所绑定。
以下描述了state的构建、配置:过期时间、状态时间戳的更新,对过期数据的处理等内容。
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1)) //过期时间:上次访问的时间 +TTL 超过了当前时间,则表明状态过期了。
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) //状态时间戳更新的时间
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) //已过期但是还未处理的状态怎么处理,NeverReturnExpired:一旦状态过期,则永远不会被返回给调用方
//清理策略:
.cleanupFullSnapshot() //对过期状态不主动处理。默认情况下,过期值只有在显式读出时才会被删除,例如通过调用 ValueState.value() 方法。
.cleanupIncrementally(1024,true)//增量清理,可配置读取若干条记录就执行一次清理,并可指定每次清理多少条失效记录。
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
TTL配置不是check/savepoints的一部分,而是Flink在当前运行的作业中如何处理它的一种方式。
小结:
state TTL 机制,应对通用的状态暴增特别有效。然而,机制不能保证一定可以及时清理掉失效的状态,以及目前仅支持 Processing Time 时间模式等等。
针对 Table API 和 SQL 模块的持续查询/聚合语句,Flink 还提供了另一项失效状态清理机制,这就是 Idle State Retention Time。
如下,官网的例子一个持续查询的分组语句,没有时间窗口的定义,理论上会无限地计算下去,但这里会出现一个问题:随着时间的推移,内存的状态会积累很多,直到状态达到了存储系统的极限,作业崩溃。
SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
针对上面的问题,Flink 提出了空闲状态保留时间(Idle State Retention Time)的概念,如下描述:
通过为每个状态设置Timer,如果这个状态中途被访问过,则重新设置Timer;否则(如果状态一直没有被访问)Timer到期时做状态清理。
这样就可以确保每个状态能够被及时的清理。
streamTableEnvironment.getConfig().setIdleStateRetentionTime(
Time.minutes(idleStateRetentionTime),
Time.of(idleStateRetentionTime * 60 + 5, TimeUnit.MINUTES));
注意:
旧版本 Flink 允许只指定一个参数,表示最早和最晚清理周期相同,但是这样可能会导致同一时间段有很多状态都到期,从而造成瞬间的处理压力。
新版本(1.11)的 Flink 要求两个参数之间的差距至少要达到 5 分钟,从而避免大量状态瞬间到期,对系统造成的冲击。
使用CleanupState 来表示idle state retention time
//状态空闲时间timer的注册
public interface CleanupState {
default void registerProcessingCleanupTimer(
ValueState<Long> cleanupTimeState, //通过ValueState来维护状态清理时间
long currentTime,
long minRetentionTime,
long maxRetentionTime,
TimerService timerService)
throws Exception {
//最近一次要清理状态的时间
Long curCleanupTime = cleanupTimeState.value();
//如果curCleanupTime为空 或 维护的时间+最小的状态空闲时间大于curCleanupTime
if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) {
//重新注册一个timer,
//此时要注意:如果maxRetentionTime和minRetentionTime的间隔过小,就会频繁的产生timer与更新valuestate,维护timer的成本将会变大。
long cleanupTime = currentTime + maxRetentionTime;
timerService.registerProcessingTimeTimer(cleanupTime);
//如果之前有timer则删除
if (curCleanupTime != null) {
timerService.deleteProcessingTimeTimer(curCleanupTime);
}
//并更新清理时间,用于触发下一次清理
cleanupTimeState.update(cleanupTime);
}
}
}
当数据第一次出现,或者curTime+minRetentionTime超过了最近的清理时间,就用curTime+maxRetentionTime,创建新的Timer,用于触发下一次清理,如果有了过期的timer就删除。
所以如果maxRetentionTime和minRetentionTime的间隔过小,就会频繁的产生timer与更新valuestate,维护timer的成本将会变大。
类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc
我正在使用i18n从头开始构建一个多语言网络应用程序,虽然我自己可以处理一大堆yml文件,但我说的语言(非常)有限,最终我想寻求外部帮助帮助。我想知道这里是否有人在使用UI插件/gem(与django上的django-rosetta不同)来处理多个翻译器,其中一些翻译器不愿意或无法处理存储库中的100多个文件,处理语言数据。谢谢&问候,安德拉斯(如果您已经在rubyonrails-talk上遇到了这个问题,我们深表歉意) 最佳答案 有一个rails3branchofthetolkgem在github上。您可以通过在Gemfi
我需要在客户计算机上运行Ruby应用程序。通常需要几天才能完成(复制大备份文件)。问题是如果启用sleep,它会中断应用程序。否则,计算机将持续运行数周,直到我下次访问为止。有什么方法可以防止执行期间休眠并让Windows在执行后休眠吗?欢迎任何疯狂的想法;-) 最佳答案 Here建议使用SetThreadExecutionStateWinAPI函数,使应用程序能够通知系统它正在使用中,从而防止系统在应用程序运行时进入休眠状态或关闭显示。像这样的东西:require'Win32API'ES_AWAYMODE_REQUIRED=0x0
我有一个模型:classItem项目有一个属性“商店”基于存储的值,我希望Item对象对特定方法具有不同的行为。Rails中是否有针对此的通用设计模式?如果方法中没有大的if-else语句,这是如何干净利落地完成的? 最佳答案 通常通过Single-TableInheritance. 关于ruby-on-rails-Rails-子类化模型的设计模式是什么?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.co
我正在使用的第三方API的文档状态:"[O]urAPIonlyacceptspaddedBase64encodedstrings."什么是“填充的Base64编码字符串”以及如何在Ruby中生成它们。下面的代码是我第一次尝试创建转换为Base64的JSON格式数据。xa=Base64.encode64(a.to_json) 最佳答案 他们说的padding其实就是Base64本身的一部分。它是末尾的“=”和“==”。Base64将3个字节的数据包编码为4个编码字符。所以如果你的输入数据有长度n和n%3=1=>"=="末尾用于填充n%
我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i
为什么4.1%2返回0.0999999999999996?但是4.2%2==0.2。 最佳答案 参见此处:WhatEveryProgrammerShouldKnowAboutFloating-PointArithmetic实数是无限的。计算机使用的位数有限(今天是32位、64位)。因此计算机进行的浮点运算不能代表所有的实数。0.1是这些数字之一。请注意,这不是与Ruby相关的问题,而是与所有编程语言相关的问题,因为它来自计算机表示实数的方式。 关于ruby-为什么4.1%2使用Ruby返
它不等于主线程的binding,这个toplevel作用域是什么?此作用域与主线程中的binding有何不同?>ruby-e'putsTOPLEVEL_BINDING===binding'false 最佳答案 事实是,TOPLEVEL_BINDING始终引用Binding的预定义全局实例,而Kernel#binding创建的新实例>Binding每次封装当前执行上下文。在顶层,它们都包含相同的绑定(bind),但它们不是同一个对象,您无法使用==或===测试它们的绑定(bind)相等性。putsTOPLEVEL_BINDINGput
我可以得到Infinity和NaNn=9.0/0#=>Infinityn.class#=>Floatm=0/0.0#=>NaNm.class#=>Float但是当我想直接访问Infinity或NaN时:Infinity#=>uninitializedconstantInfinity(NameError)NaN#=>uninitializedconstantNaN(NameError)什么是Infinity和NaN?它们是对象、关键字还是其他东西? 最佳答案 您看到打印为Infinity和NaN的只是Float类的两个特殊实例的字符串
如果您尝试在Ruby中的nil对象上调用方法,则会出现NoMethodError异常并显示消息:"undefinedmethod‘...’fornil:NilClass"然而,有一个tryRails中的方法,如果它被发送到一个nil对象,它只返回nil:require'rubygems'require'active_support/all'nil.try(:nonexisting_method)#noNoMethodErrorexceptionanymore那么try如何在内部工作以防止该异常? 最佳答案 像Ruby中的所有其他对象