突然间钉钉群出现告警,发现Flink集群中跑的所有Job进入Restarting。业务中断,影响比较大,需要迅速定位恢复。
1、查看flink集群、yarn集群、hdfs集群的运行状态,发现运行正常。
2、查看taskmanager日志文件发下异常:
2021-12-29 09:07:11,465 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Filter (1/1)#35 (ba2fe0076320a3ee8f7e7b2f471feab5) switched from RUNNING to FAILED with failure cause: java.lang.NullPointerException
此异常是在task运行的时候抛出的,可以从异常日志看到问题出现在Source的Filter方法中,也就是我们自己写的filter代码,查看Filter的具体代码发现存在空指针异常问题。对存在空值的地方进行非空判断,重启后Job恢复。
Job恢复后,我们进行了深入分析:由于最近我们也没有新协议的接入,而且数据格式都是规整过的数据。排除是消息格式引起的问题。深入看下代码,发现报错的地方,算子filter方法中变量参数dataAvailableTime是直接使用的主方法的变量。代码如下:
main...
String dataAvailableTime=...;
DataStream<Tuple2<String, String>> kafkaDataStream = env.addSource(myConsumer).filter(new FilterFunction<Tuple2<String, String>>() {
@Override
public boolean filter(Tuple2<String, String> stringStringTuple2) throws Exception {
//校验数据request_time是否还在有效期
boolean res = DtoTransferUtil.validateBasic(stringStringTuple2.f1,dataAvailableTime);
return res;
}
});
内部类中dataAvailableTime变量直接使用的主类的变量。在flink中有job manager和task manager之分。其中,job manager负责job的调度,task manager负责任务的执行。我们通过Flink Api编写流式代码的时候,flink会将你编写的这些流式代码构建成StreamGraph。StreamGraph是基于有向无环图来实现的,有向无环图能很好的解决任务调度间的依赖。然后再将StreamGraph进行执行优化,生成JobGraph。最后将生成的JobGraph提交给JobManager,由JobManager负责任务的分发和调度。


JobManager和TaskManager在flink中是单独的进程,所有分布式任务执行系统,基本都是通过RPC(Remote Procedure Call)远程过程调用来进行进程间方法调用,类似调用本地的方法。flink是基于Akka框架来实现RPC。
类路径:
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
// Make sure the user code classloader is accessible thread-locally.
// We are setting the correct context class loader before instantiating the invokable
// so that it is available to the invokable during its entire lifetime.
executingThread.setContextClassLoader(userCodeClassLoader.asClassLoader());
// When constructing invokable, separate threads can be constructed and thus should be
// monitored for system exit (in addition to invoking thread itself monitored below).
FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
try {
// now load and instantiate the task's invokable code
invokable =
loadAndInstantiateInvokable(
userCodeClassLoader.asClassLoader(), nameOfInvokableClass, env);
} finally {
FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
}
// ----------------------------------------------------------------
// actual task core work
// ----------------------------------------------------------------
// we must make strictly sure that the invokable is accessible to the cancel() call
// by the time we switched to running.
this.invokable = invokable;
通过userCodeClassLoade加载user code,然后进行执行。从这一点上可以看出,如果在算子中直接使用外部的变量,这样执行是有问题的。Job提交后,job manager接收到提交的job,开始执行job main方法。相应的算子task会被调度到不同的task manager上运行。都是不同的进程,task manager在执行算子的时候,是获取不到job main方法中的参数的。所以我们需要采用正确的方式传值。如何正确的传参数,可以参考下我另一篇文章。
我想为Heroku构建一个Rails3应用程序。他们使用Postgres作为他们的数据库,所以我通过MacPorts安装了postgres9.0。现在我需要一个postgresgem并且共识是出于性能原因你想要pggem。但是我对我得到的错误感到非常困惑当我尝试在rvm下通过geminstall安装pg时。我已经非常明确地指定了所有postgres目录的位置可以找到但仍然无法完成安装:$envARCHFLAGS='-archx86_64'geminstallpg--\--with-pg-config=/opt/local/var/db/postgresql90/defaultdb/po
尝试通过RVM将RubyGems升级到版本1.8.10并出现此错误:$rvmrubygemslatestRemovingoldRubygemsfiles...Installingrubygems-1.8.10forruby-1.9.2-p180...ERROR:Errorrunning'GEM_PATH="/Users/foo/.rvm/gems/ruby-1.9.2-p180:/Users/foo/.rvm/gems/ruby-1.9.2-p180@global:/Users/foo/.rvm/gems/ruby-1.9.2-p180:/Users/foo/.rvm/gems/rub
exe应该在我打开页面时运行。异步进程需要运行。有什么方法可以在ruby中使用两个参数异步运行exe吗?我已经尝试过ruby命令-system()、exec()但它正在等待过程完成。我需要用参数启动exe,无需等待进程完成是否有任何rubygems会支持我的问题? 最佳答案 您可以使用Process.spawn和Process.wait2:pid=Process.spawn'your.exe','--option'#Later...pid,status=Process.wait2pid您的程序将作为解释器的子进程执行。除
我有一些Ruby代码,如下所示:Something.createdo|x|x.foo=barend我想编写一个测试,它使用double代替block参数x,这样我就可以调用:x_double.should_receive(:foo).with("whatever").这可能吗? 最佳答案 specify'something'dox=doublex.should_receive(:foo=).with("whatever")Something.should_receive(:create).and_yield(x)#callthere
我正在为一个项目制作一个简单的shell,我希望像在Bash中一样解析参数字符串。foobar"helloworld"fooz应该变成:["foo","bar","helloworld","fooz"]等等。到目前为止,我一直在使用CSV::parse_line,将列分隔符设置为""和.compact输出。问题是我现在必须选择是要支持单引号还是双引号。CSV不支持超过一个分隔符。Python有一个名为shlex的模块:>>>shlex.split("Test'helloworld'foo")['Test','helloworld','foo']>>>shlex.split('Test"
我的最终目标是安装当前版本的RubyonRails。我在OSXMountainLion上运行。到目前为止,这是我的过程:已安装的RVM$\curl-Lhttps://get.rvm.io|bash-sstable检查已知(我假设已批准)安装$rvmlistknown我看到当前的稳定版本可用[ruby-]2.0.0[-p247]输入命令安装$rvminstall2.0.0-p247注意:我也试过这些安装命令$rvminstallruby-2.0.0-p247$rvminstallruby=2.0.0-p247我很快就无处可去了。结果:$rvminstall2.0.0-p247Search
我不确定传递给方法的对象的类型是否正确。我可能会将一个字符串传递给一个只能处理整数的函数。某种运行时保证怎么样?我看不到比以下更好的选择:defsomeFixNumMangler(input)raise"wrongtype:integerrequired"unlessinput.class==FixNumother_stuffend有更好的选择吗? 最佳答案 使用Kernel#Integer在使用之前转换输入的方法。当无法以任何合理的方式将输入转换为整数时,它将引发ArgumentError。defmy_method(number)
由于fast-stemmer的问题,我很难安装我想要的任何rubygem。我把我得到的错误放在下面。Buildingnativeextensions.Thiscouldtakeawhile...ERROR:Errorinstallingfast-stemmer:ERROR:Failedtobuildgemnativeextension./System/Library/Frameworks/Ruby.framework/Versions/2.0/usr/bin/rubyextconf.rbcreatingMakefilemake"DESTDIR="cleanmake"DESTDIR=
两者都可以defsetup(options={})options.reverse_merge:size=>25,:velocity=>10end和defsetup(options={}){:size=>25,:velocity=>10}.merge(options)end在方法的参数中分配默认值。问题是:哪个更好?您更愿意使用哪一个?在性能、代码可读性或其他方面有什么不同吗?编辑:我无意中添加了bang(!)...并不是要询问nobang方法与bang方法之间的区别 最佳答案 我倾向于使用reverse_merge方法:option
我有一个只接受一个参数的方法:defmy_method(number)end如果使用number调用方法,我该如何引发错误??通常,我如何定义方法参数的条件?比如我想在调用的时候报错:my_method(1) 最佳答案 您可以添加guard在函数的开头,如果参数无效则引发异常。例如:defmy_method(number)failArgumentError,"Inputshouldbegreaterthanorequalto2"ifnumbereputse.messageend#=>Inputshouldbegreaterthano