EventTimeTrigger 的触发完全依赖 watermark,换言之,如果 stream 中没有 watermark,就不会触发 EventTimeTrigger。
watermark 之于事件时间就是如此重要,来看一下 watermark 的定义先~
Watermarks 是某个 event time 窗口中所有数据都到齐的标志。
Watermarks 作为数据流的一部分流动并携带时间戳 t,Watermark(t) 断言数据流中不会再有小于时间戳 t 的事件出现。
换言之,当 Watermark(t) 到达时,标志着所有小于时间戳 t 的事件都已到齐,可放心地对时间戳 t 之前的所有事件执行聚合、窗口关闭等动作。
来看一下 《Streaming Systems》书中关于 Watermark 原汁原味的定义:
Watermarks are temporal notions of input completeness in the event-time domain. Worded differently, they are the way the system measures progress and completeness relative to the event times of the records being processed in a stream of events.
That point in event time, E, is the point up to which the system believes all inputs with event times less than E have been observed. In other words, it’s an assertion that no more data with event times less than E will ever be seen again.
如下图所示,watermark 和 event 一样在 pipeline 中流动,并且都携带时间戳,W(4) 表示在此之后不会再收到事件时间小于 4 的事件,W(9) 表示在此之后不会再接收到事件时间小于 9 的事件。
假设我们准备对这个数据流做窗口聚合操作,时间窗口大小为 4 个时间单位,窗口内元素做求和聚合操作。示例代码如下:
input.window(TumblingEventTimeWindows.of(Time.minutes(4L)))
.trigger(EventTimeTrigger.create())
.reduce(new SumReduceFunction());
我们跟踪 Flink 源码看看 EventTimeTrigger 的实现逻辑:
/**
* A Trigger that fires once the watermark passes the end of the window to which a pane belongs.
*/
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private EventTimeTrigger() {}
@Override
public TriggerResult onElement(
Object element, long timestamp, TimeWindow window, TriggerContext ctx)
throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
}
public static EventTimeTrigger create() {
return new EventTimeTrigger();
}
}
onElement 方法在每次数据进入该 window 时都会触发:首先判断当前的 watermark 是否已经超过了 window 的最大时间(窗口右界时间),如果已经超过,返回触发结果 FIRE;如果尚未超过,则根据窗口右界时间注册一个事件时间定时器,标记触发结果为 CONTINUE。
继续跟踪 registerEventTimeTimer 的动作,发现原来是将定时器放到了一个优先队列 eventTimeTimersQueue 中。优先队列的权重为定时器的时间戳,时间越小越希望先被触发,自然排在队列的前面。
onEventTime 方法是在什么时候被调用呢,一路跟踪来到了 InternalTimerServiceImpl 类,发现只有在 advanceWatermark 的时候会触发 onEventTime,具体逻辑是:当 watermark 到来时,根据 watermark 携带的时间戳 t,从事件时间定时器队列中出队所有时间戳小于 t 的定时器,然后触发 onEventTime。onEventTime 方法的具体实现中,首先比较触发时间是否是自己当前窗口的结束时间,是则 FIRE,否则继续 CONTINUE。
public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N> {
/** Event time timers that are currently in-flight. */
private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>>
eventTimeTimersQueue;
@Override
public void registerEventTimeTimer(N namespace, long time) {
eventTimeTimersQueue.add(
new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}
public void advanceWatermark(long time) throws Exception {
currentWatermark = time;
InternalTimer<K, N> timer;
while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
eventTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onEventTime(timer);
}
}
}
我们以上述示例数据为例,看一下具体的执行过程:
watermark(9) 的到达会促使定时器 Timer(8) 出队,进而 FIRE 窗口 T5-T8。以此类推。
与 EventTimeTrigger 相比,ProcessingTimeTrigger 就相对简单了,它的触发只依赖系统时间。我们跟踪 Flink 源码看看 ProcessingTimeTrigger 的实现逻辑:
/**
* A Trigger that fires once the current system time passes the end of the window to which a pane belongs.
*/
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private ProcessingTimeTrigger() {}
@Override
public TriggerResult onElement(
Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE;
}
/** Creates a new trigger that fires once system time passes the end of the window. */
public static ProcessingTimeTrigger create() {
return new ProcessingTimeTrigger();
}
}
onElement 方法在每次数据进入该 window 时都会触发:直接根据窗口结束时间注册一个处理时间定时器。同样是放到一个处理时间定时器优先队列中。
系统根据系统时间定时地从处理时间定时器队列中取出小于当前系统时间的定时器,然后调用 onProcessingTime 方法,FIRE 窗口。
public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N> {
/** Processing time timers that are currently in-flight. */
private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>>
@Override
public void registerProcessingTimeTimer(N namespace, long time) {
InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
if (processingTimeTimersQueue.add(
new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
// check if we need to re-schedule our timer to earlier
if (time < nextTriggerTime) {
if (nextTimer != null) {
nextTimer.cancel(false);
}
nextTimer = processingTimeService.registerTimer(time, this::onProcessingTime);
}
}
}
private void onProcessingTime(long time) throws Exception {
// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
// inside the callback.
nextTimer = null;
InternalTimer<K, N> timer;
while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
processingTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onProcessingTime(timer);
}
if (timer != null && nextTimer == null) {
nextTimer =
processingTimeService.registerTimer(
timer.getTimestamp(), this::onProcessingTime);
}
}
}
欢迎留言、讨论,转载请注明出处。
总的来说,我对ruby还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用
我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时
作为我的Rails应用程序的一部分,我编写了一个小导入程序,它从我们的LDAP系统中吸取数据并将其塞入一个用户表中。不幸的是,与LDAP相关的代码在遍历我们的32K用户时泄漏了大量内存,我一直无法弄清楚如何解决这个问题。这个问题似乎在某种程度上与LDAP库有关,因为当我删除对LDAP内容的调用时,内存使用情况会很好地稳定下来。此外,不断增加的对象是Net::BER::BerIdentifiedString和Net::BER::BerIdentifiedArray,它们都是LDAP库的一部分。当我运行导入时,内存使用量最终达到超过1GB的峰值。如果问题存在,我需要找到一些方法来更正我的代
Rails2.3可以选择随时使用RouteSet#add_configuration_file添加更多路由。是否可以在Rails3项目中做同样的事情? 最佳答案 在config/application.rb中:config.paths.config.routes在Rails3.2(也可能是Rails3.1)中,使用:config.paths["config/routes"] 关于ruby-on-rails-Rails3中的多个路由文件,我们在StackOverflow上找到一个类似的问题
Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack
我需要从一个View访问多个模型。以前,我的links_controller仅用于提供以不同方式排序的链接资源。现在我想包括一个部分(我假设)显示按分数排序的顶级用户(@users=User.all.sort_by(&:score))我知道我可以将此代码插入每个链接操作并从View访问它,但这似乎不是“ruby方式”,我将需要在不久的将来访问更多模型。这可能会变得很脏,是否有针对这种情况的任何技术?注意事项:我认为我的应用程序正朝着单一格式和动态页面内容的方向发展,本质上是一个典型的网络应用程序。我知道before_filter但考虑到我希望应用程序进入的方向,这似乎很麻烦。最终从任何
我在我的项目中添加了一个系统来重置用户密码并通过电子邮件将密码发送给他,以防他忘记密码。昨天它运行良好(当我实现它时)。当我今天尝试启动服务器时,出现以下错误。=>BootingWEBrick=>Rails3.2.1applicationstartingindevelopmentonhttp://0.0.0.0:3000=>Callwith-dtodetach=>Ctrl-CtoshutdownserverExiting/Users/vinayshenoy/.rvm/gems/ruby-1.9.3-p0/gems/actionmailer-3.2.1/lib/action_mailer
刚入门rails,开始慢慢理解。有人可以解释或给我一些关于在application_controller中编码的好处或时间和原因的想法吗?有哪些用例。您如何为Rails应用程序使用应用程序Controller?我不想在那里放太多代码,因为据我了解,每个请求都会调用此Controller。这是真的? 最佳答案 ApplicationController实际上是您应用程序中的每个其他Controller都将从中继承的类(尽管这不是强制性的)。我同意不要用太多代码弄乱它并保持干净整洁的态度,尽管在某些情况下ApplicationContr
我想向我的Controller传递一个参数,它是一个简单的复选框,但我不知道如何在模型的form_for中引入它,这是我的观点:{:id=>'go_finance'}do|f|%>Transferirde:para:Entrada:"input",:placeholder=>"Quantofoiganho?"%>Saída:"output",:placeholder=>"Quantofoigasto?"%>Nota:我想做一个额外的复选框,但我该怎么做,模型中没有一个对象,而是一个要检查的对象,以便在Controller中创建一个ifelse,如果没有检查,请帮助我,非常感谢,谢谢
我注意到像bundler这样的项目在每个specfile中执行requirespec_helper我还注意到rspec使用选项--require,它允许您在引导rspec时要求一个文件。您还可以将其添加到.rspec文件中,因此只要您运行不带参数的rspec就会添加它。使用上述方法有什么缺点可以解释为什么像bundler这样的项目选择在每个规范文件中都需要spec_helper吗? 最佳答案 我不在Bundler上工作,所以我不能直接谈论他们的做法。并非所有项目都checkin.rspec文件。原因是这个文件,通常按照当前的惯例,只