草庐IT

【状态管理|概述】Flink的状态管理:为什么需要state、怎么保存state、对于state过大怎么处理

roman_日积跬步-终至千里 2023-04-11 原文

文章目录

一. state相关

1. state种类

按照数据的划分和扩张方式,Flink中大致分为2类:

  • Keyed States:记录每个Key对应的状态值

因为一个任务的并行度有多少,就会有多少个子任务,当key的范围大于并行度时,就会出现一个subTask上可能包含多个Key(),但不同Task上不会出现相同的Key(解决了shuffle的问题?)
 
常用的 MapState、ValueState。

  • Operator States:记录每个Task对应的状态值数据类型。

 

2. State的存在形式

Keyed State 和 Operator State 存在两种形式:managed (托管状态)和 raw(原始状态)。

  • 托管状态是由Flink框架管理的状态,原始状态是由用户自行管理状态的具体数据结构。
  • 通常所有的 datastream functions 都可以使用托管状态,但是原始状态接口仅仅能够在实现 operators的时候使用。
  • 推荐使用 managed state 而不是使用 raw state,因为使用托管状态的时候 Flink 可以在 parallelism 发生改变的情况下能够动态重新分配状态,而且还能更好的进行内存管理。

 

3. state在哪产生

没有状态的操作

从概念上讲, 源表从来不会在状态中被完全保存。 形如 SELECT … FROM … WHERE
这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。

诸如 join、 聚合或去重操作需要在 Flink 抽象的容错存储内保存中间结果。看下sum的状态操作

@Internal
public class StreamGroupedReduceOperator<IN>
        extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
        implements OneInputStreamOperator<IN, IN> {

    private static final long serialVersionUID = 1L;

    private static final String STATE_NAME = "_op_state";

    private transient ValueState<IN> values;

    private final TypeSerializer<IN> serializer;

    public StreamGroupedReduceOperator(ReduceFunction<IN> reducer, TypeSerializer<IN> serializer) {
        super(reducer);
        this.serializer = serializer;
    }

    @Override
    public void open() throws Exception {
        super.open();
        ValueStateDescriptor<IN> stateId = new ValueStateDescriptor<>(STATE_NAME, serializer);
        //获得value state
        values = getPartitionedState(stateId);
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        IN value = element.getValue();
        IN currentValue = values.value();
        //如果currentValue不为null,则说明不是第一次启动,也就是在hdfs上已经存储了中间状态 
        if (currentValue != null) {
            //先做一个聚合,然后再更新,之后输出到下游
            IN reduced = userFunction.reduce(currentValue, value);
            values.update(reduced);
            output.collect(element.replace(reduced));
        } else {
            //第一次启动直接更新数据,之后输出到下游
            values.update(value);
            output.collect(element.replace(value));
        }
    }
}

 

4. state 内存设置

从 Flink1.10 开始,Flink 默认将 state 内存大小配置为每个 task slot 的托管内存。

调试内存性能的问题主要是通过调整配置项,来提高Flink的托管内存:

taskmanager.memory.managed.size 
//推荐使用比例计算
taskmanager.memory.managed.fraction 

具体调优案例分析可见:Flink on yarn双流join问题分析+性能调优思路

 
 

二. state backend

Flink状态后端主要负责两件事:本地的状态管理、将检查点(checkpoint)状态写入远程存储。

flink state可以存储在java堆内存内或者内存之外。

默认情况下,使用MemoryStateBackend,Flink的state会保存在taskManager的内存中,而checkpoint会保存在jobManager的内存中。

 

1. 三种状态后端

flink提供三种开箱即用的State Backend:

状态后端数据存储容量限制场景
MemoryStateBackend
State:TaskManager 内存中
Checkpoint:存储在jobManager 内存
单个State maxStateSize默认为5M
maxStateSize <= akka.frame.size默认10M
Checkpoint总大小不能超过JobMananger的内存
本地测试
状态比较少的作业
不推荐生产环境中使用
FsStateBackend
State:TaskManager 内存
Checkpoint:外部文件系统(本地或HDFS)
单个TaskManager上State总量不能超过TM内存
总数据大小不超过文件系统容量
窗口时间比较长,如分钟级别窗口聚合,Join等
需要开启HA的作业
可在生产环境中使用
RocksDBStateBackend
将所有的状态序列化之后, 存入本地的 RocksDB 数据库中.(一种 NoSql 数 据库, KV 形式存储)
State: TaskManager 中的KV数据库(实际使用内存+磁盘)
Checkpoint:外部文件系统(本地或HDFS)
单TaskManager 上 State总量不超过其内存+磁盘大小,单 Key最大容量2G
总大小不超过配置的文件系统容量
超大状态作业
需要开启HA的
作业生产环境可用

 

2. 如何在hdfs中存储?

Keyed States 和 Operator States 会存储在一个带有编号的 chk 目录中,比如说一个 flink 任务的 Keyed States 的 subTask 个数是4,Operator States 对应的 subTask 也是 4,那么 chk 会存一个元数据文件 _metadata ,四个 Keyed States 文件,四个 Operator States 的文件

也就是说 Keyed States 和 Operator States 会分别存储 subTask 总数个状态文件。

 

3. 设置checkpoint

一般需求,我们的 Checkpoint 时间间隔可以设置为分钟级别(1-5 分钟)。

3.1. 大状态下设置checkpoint

对于状态很大的任务每次 Checkpoint 访问 HDFS 比较耗时,可以设置为 5~10 分钟一次Checkpoint,并且调大两次 Checkpoint 之间的暂停间隔,例如设置两次 Checkpoint 之 间至少暂停 4 或 8 分钟。

具体案例分析可见:Flink on yarn双流join问题分析+性能调优思路

 

3.2. EXACTLY_ONCE下设置分析checkpoint

如果 Checkpoint 语义配置为 EXACTLY_ONCE,那么在 Checkpoint 过程中还会存在 barrier 对齐的过程,可以通过 Flink Web UI 的 Checkpoint 选项卡来查看 Checkpoint 过程中各阶段的耗时情况,从而确定到底是哪个阶段导致 Checkpoint 时间过长然后针对性的解决问题。

 
 

三. State设置过期时间

使用 flink 进行实时计算中,会遇到一些状态数不断累积,导致状态量越来越大的情形。例如,作业中定义了超长的时间窗口,或者在动态表上应用了无限范围的 GROUP BY 语句,以及执行了没有时间窗口限制的双流 JOIN 等等操作。

对于这些情况,经常导致堆内存出现 OOM,或者堆外内存(RocksDB)用量持续增长导致超出容器的配额上限,造成作业的频繁崩溃。

从 Flink 1.6 版本开始引入了 State TTL 特性,该特性可以允许对作业中定义的 Keyed 状态进行超时自动清理,对于Table API 和 SQL 模块引入了空闲状态保留时间(Idle State Retention Time)进行状态管理。

 

1. datastream的TTL

要使用 State TTL 功能,首先要定义一个 StateTtlConfig 对象。State TTL功能所指定的过期时间并不是全局生效的,而是和某个具体的算子状态所绑定。

以下描述了state的构建、配置:过期时间、状态时间戳的更新,对过期数据的处理等内容。

 
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1)) //过期时间:上次访问的时间 +TTL 超过了当前时间,则表明状态过期了。
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) //状态时间戳更新的时间
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) //已过期但是还未处理的状态怎么处理,NeverReturnExpired:一旦状态过期,则永远不会被返回给调用方
    //清理策略:
    .cleanupFullSnapshot() //对过期状态不主动处理。默认情况下,过期值只有在显式读出时才会被删除,例如通过调用 ValueState.value() 方法。
    .cleanupIncrementally(1024,true)//增量清理,可配置读取若干条记录就执行一次清理,并可指定每次清理多少条失效记录。
    .build();
    
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

TTL配置不是check/savepoints的一部分,而是Flink在当前运行的作业中如何处理它的一种方式。

 
小结:

state TTL 机制,应对通用的状态暴增特别有效。然而,机制不能保证一定可以及时清理掉失效的状态,以及目前仅支持 Processing Time 时间模式等等。

 
 

2.Table API和SQL的状态管理

针对 Table API 和 SQL 模块的持续查询/聚合语句,Flink 还提供了另一项失效状态清理机制,这就是 Idle State Retention Time。

2.1. 问题描述与分析

如下,官网的例子一个持续查询的分组语句,没有时间窗口的定义,理论上会无限地计算下去,但这里会出现一个问题:随着时间的推移,内存的状态会积累很多,直到状态达到了存储系统的极限,作业崩溃。

SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;

针对上面的问题,Flink 提出了空闲状态保留时间(Idle State Retention Time)的概念,如下描述:

通过为每个状态设置Timer,如果这个状态中途被访问过,则重新设置Timer;否则(如果状态一直没有被访问)Timer到期时做状态清理。

这样就可以确保每个状态能够被及时的清理。

 

2.2. 状态设置

streamTableEnvironment.getConfig().setIdleStateRetentionTime(
					Time.minutes(idleStateRetentionTime),
                    Time.of(idleStateRetentionTime * 60 + 5, TimeUnit.MINUTES));

注意:

旧版本 Flink 允许只指定一个参数,表示最早和最晚清理周期相同,但是这样可能会导致同一时间段有很多状态都到期,从而造成瞬间的处理压力。
 
新版本(1.11)的 Flink 要求两个参数之间的差距至少要达到 5 分钟,从而避免大量状态瞬间到期,对系统造成的冲击

 

2.3. 实现逻辑与源码分析

使用CleanupState 来表示idle state retention time

//状态空闲时间timer的注册
public interface CleanupState {
    default void registerProcessingCleanupTimer(
            ValueState<Long> cleanupTimeState, //通过ValueState来维护状态清理时间
            long currentTime,
            long minRetentionTime,
            long maxRetentionTime,
            TimerService timerService)
            throws Exception {
        //最近一次要清理状态的时间
        Long curCleanupTime = cleanupTimeState.value();
 
        //如果curCleanupTime为空 或 维护的时间+最小的状态空闲时间大于curCleanupTime 
        if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) {
           //重新注册一个timer,
           //此时要注意:如果maxRetentionTime和minRetentionTime的间隔过小,就会频繁的产生timer与更新valuestate,维护timer的成本将会变大。
            long cleanupTime = currentTime + maxRetentionTime;
            timerService.registerProcessingTimeTimer(cleanupTime);
            //如果之前有timer则删除
            if (curCleanupTime != null) {
                timerService.deleteProcessingTimeTimer(curCleanupTime);
            }
            //并更新清理时间,用于触发下一次清理
            cleanupTimeState.update(cleanupTime);
        }
    }
}

当数据第一次出现,或者curTime+minRetentionTime超过了最近的清理时间,就用curTime+maxRetentionTime,创建新的Timer,用于触发下一次清理,如果有了过期的timer就删除。
所以如果maxRetentionTime和minRetentionTime的间隔过小,就会频繁的产生timer与更新valuestate,维护timer的成本将会变大。

 
 
 
参考:
Flink 状态管理详解(State TTL、Operator state、Keyed state)

有关【状态管理|概述】Flink的状态管理:为什么需要state、怎么保存state、对于state过大怎么处理的更多相关文章

  1. ruby - 为什么我可以在 Ruby 中使用 Object#send 访问私有(private)/ protected 方法? - 2

    类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc

  2. ruby - i18n Assets 管理/翻译 UI - 2

    我正在使用i18n从头开始​​构建一个多语言网络应用程序,虽然我自己可以处理一大堆yml文件,但我说的语言(非常)有限,最终我想寻求外部帮助帮助。我想知道这里是否有人在使用UI插件/gem(与django上的django-rosetta不同)来处理多个翻译器,其中一些翻译器不愿意或无法处理存储库中的100多个文件,处理语言数据。谢谢&问候,安德拉斯(如果您已经在ruby​​onrails-talk上遇到了这个问题,我们深表歉意) 最佳答案 有一个rails3branchofthetolkgem在github上。您可以通过在Gemfi

  3. ruby - 在 Ruby 程序执行时阻止 Windows 7 PC 进入休眠状态 - 2

    我需要在客户计算机上运行Ruby应用程序。通常需要几天才能完成(复制大备份文件)。问题是如果启用sleep,它会中断应用程序。否则,计算机将持续运行数周,直到我下次访问为止。有什么方法可以防止执行期间休眠并让Windows在执行后休眠吗?欢迎任何疯狂的想法;-) 最佳答案 Here建议使用SetThreadExecutionStateWinAPI函数,使应用程序能够通知系统它正在使用中,从而防止系统在应用程序运行时进入休眠状态或关闭显示。像这样的东西:require'Win32API'ES_AWAYMODE_REQUIRED=0x0

  4. ruby-on-rails - Rails - 子类化模型的设计模式是什么? - 2

    我有一个模型:classItem项目有一个属性“商店”基于存储的值,我希望Item对象对特定方法具有不同的行为。Rails中是否有针对此的通用设计模式?如果方法中没有大的if-else语句,这是如何干净利落地完成的? 最佳答案 通常通过Single-TableInheritance. 关于ruby-on-rails-Rails-子类化模型的设计模式是什么?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.co

  5. ruby - 什么是填充的 Base64 编码字符串以及如何在 ruby​​ 中生成它们? - 2

    我正在使用的第三方API的文档状态:"[O]urAPIonlyacceptspaddedBase64encodedstrings."什么是“填充的Base64编码字符串”以及如何在Ruby中生成它们。下面的代码是我第一次尝试创建转换为Base64的JSON格式数据。xa=Base64.encode64(a.to_json) 最佳答案 他们说的padding其实就是Base64本身的一部分。它是末尾的“=”和“==”。Base64将3个字节的数据包编码为4个编码字符。所以如果你的输入数据有长度n和n%3=1=>"=="末尾用于填充n%

  6. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  7. ruby - 为什么 4.1%2 使用 Ruby 返回 0.0999999999999996?但是 4.2%2==0.2 - 2

    为什么4.1%2返回0.0999999999999996?但是4.2%2==0.2。 最佳答案 参见此处:WhatEveryProgrammerShouldKnowAboutFloating-PointArithmetic实数是无限的。计算机使用的位数有限(今天是32位、64位)。因此计算机进行的浮点运算不能代表所有的实数。0.1是这些数字之一。请注意,这不是与Ruby相关的问题,而是与所有编程语言相关的问题,因为它来自计算机表示实数的方式。 关于ruby-为什么4.1%2使用Ruby返

  8. ruby - ruby 中的 TOPLEVEL_BINDING 是什么? - 2

    它不等于主线程的binding,这个toplevel作用域是什么?此作用域与主线程中的binding有何不同?>ruby-e'putsTOPLEVEL_BINDING===binding'false 最佳答案 事实是,TOPLEVEL_BINDING始终引用Binding的预定义全局实例,而Kernel#binding创建的新实例>Binding每次封装当前执行上下文。在顶层,它们都包含相同的绑定(bind),但它们不是同一个对象,您无法使用==或===测试它们的绑定(bind)相等性。putsTOPLEVEL_BINDINGput

  9. ruby - Infinity 和 NaN 的类型是什么? - 2

    我可以得到Infinity和NaNn=9.0/0#=>Infinityn.class#=>Floatm=0/0.0#=>NaNm.class#=>Float但是当我想直接访问Infinity或NaN时:Infinity#=>uninitializedconstantInfinity(NameError)NaN#=>uninitializedconstantNaN(NameError)什么是Infinity和NaN?它们是对象、关键字还是其他东西? 最佳答案 您看到打印为Infinity和NaN的只是Float类的两个特殊实例的字符串

  10. ruby-on-rails - 如果 Object::try 被发送到一个 nil 对象,为什么它会起作用? - 2

    如果您尝试在Ruby中的nil对象上调用方法,则会出现NoMethodError异常并显示消息:"undefinedmethod‘...’fornil:NilClass"然而,有一个tryRails中的方法,如果它被发送到一个nil对象,它只返回nil:require'rubygems'require'active_support/all'nil.try(:nonexisting_method)#noNoMethodErrorexceptionanymore那么try如何在内部工作以防止该异常? 最佳答案 像Ruby中的所有其他对象

随机推荐