为什么你的Flink运行开始减慢了? 为什么你试遍Flink参数还是无法解决? Flink背压常常发生在生产事故中,切记不要掉以轻心。不知为何,最近的我开始走下坡路了。。。
我心里直犯嘀咕: 我又有什么好看的呢?不过是A君你用来换取面包、汽车的工具罢了。虽然陪伴了五年的时光,想来也就是如此~说到这,忘了自我介绍了。我叫Flink,当然,我还是喜欢你们叫我的全名: Apache Flink,因为这样听起来很有科技感。
我是目前最火的大数据实时计算引擎之一。之所以敢这么说,是因为目前我在实时领域确实处于独领风骚的地位,不信请看下面的统计:
此处需要@一下我的老大哥:Apache Spark,我听说一度出现过"Flink的出现,Spark是否慢慢成为鸡肋"的言论。咱也不敢说,也不敢问,对于前辈还是保持尊重和理性。"咳"~ 一声轻咳把我拉回了现实,A君又开始调试代码了~Kafka那里呆了一周,好像是准备搞一个大事情。等到他找到我, 才知道公司准备建设实时数仓。需要我和Kafka兄弟一起加入,处理亿级别实时数据。对于实时数仓我大抵是了解的。再看看A君的老大拿出的架构方案,心中暗喜:这可是到了我的专业领域。
整体架构并不难,很好理解。Kafka兄弟,互相点了点头,我们开始吧~作为老搭档,我和Kafka兄弟配合的很默契,A君也是个老手,于是我们在短短的一周内就出色的完成了初步任务。我可以给你看看我们的部分配合成果:- src.main.scala.com.xxproject.xx
|--handler
|---FlinkODSHandler.scala
|---FlinkDWHandler.scala
|---FlinkADSHandler.scala
...
|--service
|---KafkaSchdulerService.scala
|---SchdulerService.scala
...
|--config/util/model
|---KafkaUtils.scala
|---XXDataModel.scala
...
春风得意马蹄疾~ 此刻的心情舒服极了,我们仨简直就是完美搭档。。可是好景不长。来到第二周,我渐渐的发现自己开始变慢了~具体的表现为 :我有点慌,去看了下自身的状况,结果吓了一大跳:
- 运行开始时正常,到了后面就出现大量Task任务
等待- 少量Task任务开始报
checkpoint超时问题- Kafka数据堆积,无法消费
无论是输入还是输出,缓冲区内存都被占满了。数据处理不过来,barrier流动极为缓慢,大量checkpoint生成时间变长。我发生了背压问题!!!内存频繁告急。转眼间我的Task执行页面已被红色High标识占满~
没有办法,最终我还是向A君发出了告警~A君收到消息,盯着我看了好一会,叹了口气。我觉得有点不好意思,感觉把事情搞砸了。。他没有多说什么,只是问起了我的反压机制,说要从源头解决问题。
下面是A君和我的对话
可以细分两种情况:当前Task任务处理速度慢,比如task任务中调用算法处理等复杂逻辑,导致上游申请不到足够内存。下游Task任务处理速度慢,比如多次collect()输出到下游,导致当前节点无法申请足够的内存。Checkpoint。
Checkpoint时需要进行Barrier对齐,此时若某个Task出现反压,Barrier流动速度会下降,导致Checkpoint变慢甚至超时,任务整体也变慢。
长期或频繁出现反压才需要处理,如果由于网络波动或者GC出现的偶尔反压可以不必处理。
BackPressure详情,找到第一个出现反压的Task。下面这是正常的状况~
BackPressure界面定期
我的内部检测原理
采样Task线程栈信息,统计线程请求内存Buffer的阻塞频率,判断节点是否处于反压状态。0.1显示正常(0.1,0.5) 为LOW,背压轻微0.5为 HIGH,需要注意反压
此时,我指给A君看了目前项目的BackPressure页面,这明显是不正常的状况。
整体流程可类比为生产者->消费者体系。上游生产者发送数据(2M/s)至Send Buffer,途径网络传输(5M/s)到Receive Buffer, 最终下游Consumer消费(<1M/s)。这明显是不行的,下游速度慢于上游速度,数据久积成疾~ 需要做限流。
这很好理解。既然上游处理较快,那么我添加一个限流机制将其速度降下来,让上下游速度基本一致,这样不就解决了吗。。其实不然,这里有几个问题:考虑到这些原因,我的内部提供一种强大的反压机制:
- 我无法提前预估下游实际速度(流速限制设置多少)
- 常碰到网络波动等情况,上下游的流速是
动态变化的

上下游动态反馈,如果下游速度慢,则上游限速;否则上游提速。实现动态自动反压的效果。
上游发送网络数据前经过自身的Network Buffer层,之后往下传输到Channel Buffer层(Netty通道)。最终通过网络传输,层层传递达到下游。Network Buffer、Channel Buffer和Socket Buffer通俗理解就是有关内核态和用户态原理,有兴趣的小伙伴欢迎添加个人微信: 用户态和内核态的区别,处于不同的交换空间和操作系统。
youlong525 进行讨论~
TaskManager维护共享Network BufferPool(Task共享内存池),初始化时向Off-heap Memory中申请内存。Local BufferPool(Task本地内存池),并和Network BufferPool交换内存。Record Writer向 Local BufferPool申请buffer(内存)写数据。如果Local BufferPool没有足够内存则向Network BufferPool申请,使用完之后将申请的内存返回Pool。Netty Buffer拷贝buffer并经过Socket Buffer发送到网络,后续下游端按照相似机制处理。内存不够,则逐层发送反压信号给上游,上游慢慢停止数据发送,直到下游再次恢复。
所以,我的反压机制类似于Java中的阻塞队列,如下图我的内存级的反压工作原理示意。
Task任务通过与Local BufferPool和Network BufferPool协作进行内存申请和释放,同时下游内存使用情况实时反馈给上游,实现动态反压。
A君听完我的回答,陷入了沉思~
数据倾斜和并发。10G左右大小!!再看下分区内的其他值(如图):
发生数据倾斜了~我心里有了底,立马和A君一起找出了这些特殊的Key,进行预聚合打散和数据拆分,再次运行。
感觉有那么一点效果,但是还是有蛮多的高峰值。。并发度,毕竟增加线程数总归会缓解一些计算压力。不甘心的调整了参数之后,结果依然没有太多提升。
并发度。心中不以为然,我刚才可就试过了这个。。好像有点不对劲。。
这就是我要的结果!!我不禁喊了出来。他笑了笑,告诉我这是用到了我的算子链机制。算子链这样做的好处是:
通过将下游算子和上游算子设置相同并发度,可自动形成算子链
整个流程中形成多个算子链,导致线程开销和内存使用率下降。我的反压情况自然也变得缓和了。我不禁大受震撼~~高压的日子彻底结束,此刻尽丝滑~我缓缓吐出一口气,有点欣慰的看着最后的结果:
不自觉地抬头看了眼A君,他也露出了久违的微笑。我是Flink,现在没有压力~本文完。》》》更多好文,请大家关注我的公众号: 大数据兵工厂简而言之错误:NOTE:Gem::SourceIndex#add_specisdeprecated,useSpecification.add_spec.Itwillberemovedonorafter2011-11-01.Gem::SourceIndex#add_speccalledfrom/opt/local/lib/ruby/site_ruby/1.8/rubygems/source_index.rb:91./opt/local/lib/ruby/gems/1.8/gems/rails-2.3.8/lib/rails/gem_dependency.rb:275:in`==':und
我有1.8.6附带的VanillaMacOSXLeopard。我是RoR的新手,所以会学习网上的教程。在使用更高版本的Ruby时,我是否可能会发现遵循它们的问题?我目前正在查看提到1.8.6和1.8.7的这个-http://www.railstutorial.org/book 最佳答案 RoR教程对两者都适用,但如果您正在学习Ruby,则应该学习1.9。Rails3将不支持1.8.6,所以我会选择1.8.7或1.9。我还推荐使用RVM在Ruby版本之间切换。 关于ruby-on-rail
我正在尝试编写一个Ruby扩展,而且我一整天都在成功编译我的nmatrix.so共享对象文件。但是,突然之间,它开始生成nmatrix.bundle,而根本没有任何.so文件。它没有给我任何链接器错误,所以我无法想象为什么会这样。我也没有更改myMakefileorextconf.rb中的任何内容.我一直通过rvm使用Ruby1.9.3p0。我已经尝试对当天的工作执行gitstashsave并编译一些我知道应该没有链接器错误的东西(产生.so的东西)更早。不幸的是,这也会生成一个.bundle文件。很明显,我做了一些事情——可能是无意中安装了一些东西——这改变了一些关键的GCC设置。这
集成背景我们当前集群使用的是ClouderaCDP,Flink版本为ClouderaVersion1.14,整体Flink安装目录以及配置文件结构与社区版本有较大出入。直接根据Streampark官方文档进行部署,将无法配置FlinkHome,以及后续整体Flink任务提交到集群中,因此需要进行针对化适配集成,在满足使用需求上,尽量提供完整的Streampark使用体验。集成步骤版本匹配问题解决首先解决无法识别Cloudera中的FlinkHome问题,根据报错主要明确到的事情是无法读取到Flink版本、lib下面的jar包名称无法匹配。修改对象:修改源码:(解决无法匹配clouderajar
我正在将Rails2应用程序升级到Rails3.2,并且遇到了所谓的惯用语。person.tap|p|做当我用Google搜索这个和itappearstohavebeendeprecatedormoved时.我的理解正确吗?我问是因为我可以在SO上找到它的几个例子。 最佳答案 tap方法已经在Rubysince1.8.7:tap{|x|...}=>objYieldsxtotheblock,andthenreturnsx.Theprimarypurposeofthismethodisto“tapinto”amethodchain,in
我有一个Rails应用程序,我在其中使用delayed_job。我想检测我是否在delayed_job进程中;像ifin_delayed_job?#dosomethingonlyifitisadelayed_jobprocess...else#dosomethingonlyifitisnotadelayed_jobprocess...end但是我不知道怎么办。这是我现在使用的:IN_DELAYED_JOB=beginbasename=File.basename$0arguments=$*rake_args_regex=/\Ajobs:/(basename=='delayed_job')
在3.0之前有一种方法可以做到这一点:#...set:mysql_password,proc{Capistrano::CLI.password_prompt"Gimmeremotedatabaseserverpassword.Don'tworry,Iwon'ttellanyone:"}#...namespace:dbdodesc'Dumpremotedatabase'task:dumpdorun"mysqldump-u#{mysql_user}-p#{mysql_database}>~/#{mysql_database}.sql"do|channel,stream,data|ifdat
如何切换回来?我安装了RVM,然后安装了Ruby1.9.3,然后运行geminstallrails。运行rails-v我可以看到我有Rails4.0.0.rc1但我不想使用该版本,因为我的主机不支持它供应商。如何安装Rails3.2.13并在运行railsnew命令时将其用作默认值? 最佳答案 尝试:gemuninstallrailsgeminstallrails--version"=3.2.13" 关于ruby-on-rails-使用geminstallrails,现在我有4.0.
文章目录使用flinksqlclientonyarnsession模式Per-JobCluster模式flinkrunflinkrunapplication-tyarn-application配置任务退出时保留Checkpoint从外部checkpoint恢复应用资料使用安装完hadoop3.3.4之后,启动hadoop、yarn将flink1.14.6上传到各个服务器节点,解压flinksqlclientonyarnhttps://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/overview/Appli
我对所有事情都使用rescue,而不仅仅是“拯救”异常。我的意思是,我只是喜欢它省去验证和双重检查数据的方式。例如,假设我有一个模型Item,它可能有也可能没有User。然后,当我想获得我写的元素的所有者姓名时:item.user.namerescue""而不是类似的东西item.user.nil??"":item.user.name它产生了同样的想法,因为nil.name触发了我用""挽救的异常,但我不太确定这是一个好习惯。它实现了我想要的,并且用更少的代码实现了,但是...我不知道,到处都是rescue字眼让我感到不安全。这是一种不好的做法还是滥用rescue关键字是否有效?