随着Flink流批一体能力的迅速发展以及Flink SQL易用性的提升,越来越多的厂商开始将Flink作为离线批处理引擎使用。在我们使用Flink进行大规模join操作时,也许会发生如下的异常,导致任务失败:
Hash join exceeded maximum number of recursions, without reducing partitions enough to be memory resident.
字面意思即为Hash Join的递归次数超出限制。Flink批模式下的join算法有两种,即Hybrid Hash Join和Sort-Merge Join。顾名思义,Hybrid Hash Join就是Simple Hash Join和Grace Hash Join两种算法的结合(关于它们,看官可参考这篇文章)。引用一张Flink官方博客中的手绘图来说明。

Flink的Hybrid Hash Join在build阶段会积极地利用TaskManager的托管内存,并将内存无法容纳的哈希分区spill到磁盘中。在probe阶段,当内存中的哈希分区处理完成后,会释放掉对应的MemorySegment,并将先前溢写到磁盘的分区读入,以提升probe效率。特别注意,如果溢写分区对空闲的托管内存而言仍然过大(特别是存在数据倾斜的情况时),就会将其递归拆分成更小的分区,原理如下图所示。

当然,递归拆分也不能是无限制的。在Blink Runtime中,如果递归拆分3次仍然不能满足内存需求,就会抛出前文所述的异常了。
笔者在今年7月ApacheCon Asia 2022流处理专场的分享内容里谈到了这个问题,并且将其归咎于Flink SQL的CBO优化器的代价模型不太科学,导致其十分偏向选择Hash Join。由于修改的难度很大,所以暂时的workaround就是在任务失败后,自动设置table.exec.disabled-operators参数来禁用掉ShuffleHashJoin算子,从而强制使用Sort-Merge Join。

当然这仍然不算优雅的解决方法,接下来简要看看Flink 1.16版本中提出的更好一点的方案:Adaptive Hash Join。
所谓adaptive(自适应),就是指Hash Join递归超限时,不必让任务失败,而是将这些大分区自动转为Sort-Merge Join来处理。
Blink Runtime中的哈希表有两种,即BinaryHashTable(key的类型为BinaryRowData)和LongHybridHashTable(key的类型为Long)。以前者为例,查看其prepareNextPartition()方法,该方法负责递归地取得下一个要处理的哈希分区。
private boolean prepareNextPartition() throws IOException {
// finalize and cleanup the partitions of the current table
// ......
// there are pending partitions
final BinaryHashPartition p = this.partitionsPending.get(0);
// ......
final int nextRecursionLevel = p.getRecursionLevel() + 1;
if (nextRecursionLevel == 2) {
LOG.info("Recursive hash join: partition number is " + p.getPartitionNumber());
} else if (nextRecursionLevel > MAX_RECURSION_DEPTH) {
LOG.info(
"Partition number [{}] recursive level more than {}, process the partition using SortMergeJoin later.",
p.getPartitionNumber(),
MAX_RECURSION_DEPTH);
// if the partition has spilled to disk more than three times, process it by sort merge
// join later
this.partitionsPendingForSMJ.add(p);
// also need to remove it from pending list
this.partitionsPending.remove(0);
// recursively get the next partition
return prepareNextPartition();
}
// build the next table; memory must be allocated after this call
buildTableFromSpilledPartition(p, nextRecursionLevel);
// set the probe side
setPartitionProbeReader(p);
// unregister the pending partition
this.partitionsPending.remove(0);
this.currentRecursionDepth = p.getRecursionLevel() + 1;
// recursively get the next
return nextMatching();
}
注意当递归深度超过MAX_RECURSION_DEPTH(常量定义即为3)时,会将分区直接放入一个名为partitionsPendingForSMJ的容器中,等待做Sort-Merge Join。另外,在该方法调用的buildTableFromSpilledPartition()方法(对溢写分区执行build操作)开头,去掉了对递归超限的判断,也就是说Hash join exceeded maximum number of recursions异常已经成为历史。
那么等待做Sort-Merge Join的分区是如何被处理的?查看Blink Runtime中的HashJoinOperator算子,在构造该算子时,需要比原来多传入一个SortMergeJoinFunction的实例:
private final SortMergeJoinFunction sortMergeJoinFunction;
SortMergeJoinFunction实际上是将旧版的SortMergeJoinOperator处理逻辑抽离出来的类,算法本身没有任何变化。然后从哈希表中读取前述的partitionsPendingForSMJ容器,对每个分区的build侧和probe侧分别执行Sort-Merge Join操作即可。
/**
* If here also exists partitions which spilled to disk more than three time when hash join end,
* means that the key in these partitions is very skewed, so fallback to sort merge join
* algorithm to process it.
*/
private void fallbackSMJProcessPartition() throws Exception {
if (!table.getPartitionsPendingForSMJ().isEmpty()) {
// release memory to MemoryManager first that is used to sort merge join operator
table.releaseMemoryCacheForSMJ();
// initialize sort merge join operator
LOG.info("Fallback to sort merge join to process spilled partitions.");
initialSortMergeJoinFunction();
fallbackSMJ = true;
for (BinaryHashPartition p : table.getPartitionsPendingForSMJ()) {
// process build side
RowIterator<BinaryRowData> buildSideIter =
table.getSpilledPartitionBuildSideIter(p);
while (buildSideIter.advanceNext()) {
processSortMergeJoinElement1(buildSideIter.getRow());
}
// process probe side
ProbeIterator probeIter = table.getSpilledPartitionProbeSideIter(p);
BinaryRowData probeNext;
while ((probeNext = probeIter.next()) != null) {
processSortMergeJoinElement2(probeNext);
}
}
// close the HashTable
closeHashTable();
// finish build and probe
sortMergeJoinFunction.endInput(1);
sortMergeJoinFunction.endInput(2);
LOG.info("Finish sort merge join for spilled partitions.");
}
}
private void initialSortMergeJoinFunction() throws Exception {
sortMergeJoinFunction.open(
true,
this.getContainingTask(),
this.getOperatorConfig(),
(StreamRecordCollector) this.collector,
this.computeMemorySize(),
this.getRuntimeContext(),
this.getMetricGroup());
}
private void processSortMergeJoinElement1(RowData rowData) throws Exception {
if (leftIsBuild) {
sortMergeJoinFunction.processElement1(rowData);
} else {
sortMergeJoinFunction.processElement2(rowData);
}
}
private void processSortMergeJoinElement2(RowData rowData) throws Exception {
if (leftIsBuild) {
sortMergeJoinFunction.processElement2(rowData);
} else {
sortMergeJoinFunction.processElement1(rowData);
}
}
与BinaryHashTable不同,LongHybridHashTable的join逻辑全部是代码生成的,在对应的生成器LongHashJoinGenerator中,可以看到与上文类似的代码,看官可以自行找来读读。
民那晚安晚安。
我有一个模型:classItem项目有一个属性“商店”基于存储的值,我希望Item对象对特定方法具有不同的行为。Rails中是否有针对此的通用设计模式?如果方法中没有大的if-else语句,这是如何干净利落地完成的? 最佳答案 通常通过Single-TableInheritance. 关于ruby-on-rails-Rails-子类化模型的设计模式是什么?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.co
我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i
鉴于我有以下迁移:Sequel.migrationdoupdoalter_table:usersdoadd_column:is_admin,:default=>falseend#SequelrunsaDESCRIBEtablestatement,whenthemodelisloaded.#Atthispoint,itdoesnotknowthatusershaveais_adminflag.#Soitfails.@user=User.find(:email=>"admin@fancy-startup.example")@user.is_admin=true@user.save!ende
给定一个复杂的对象层次结构,幸运的是它不包含循环引用,我如何实现支持各种格式的序列化?我不是来讨论实际实现的。相反,我正在寻找可能会派上用场的设计模式提示。更准确地说:我正在使用Ruby,我想解析XML和JSON数据以构建复杂的对象层次结构。此外,应该可以将该层次结构序列化为JSON、XML和可能的HTML。我可以为此使用Builder模式吗?在任何提到的情况下,我都有某种结构化数据-无论是在内存中还是文本中-我想用它来构建其他东西。我认为将序列化逻辑与实际业务逻辑分开会很好,这样我以后就可以轻松支持多种XML格式。 最佳答案 我最
了解Rails缓存如何工作的人可以真正帮助我。这是嵌套在Rails::Initializer.runblock中的代码:config.after_initializedoSomeClass.const_set'SOME_CONST','SOME_VAL'end现在,如果我运行script/server并发出请求,一切都很好。然而,在我的Rails应用程序的第二个请求中,一切都因单元化常量错误而变得糟糕。在生产模式下,我可以成功发出第二个请求,这意味着常量仍然存在。我已通过将以上内容更改为以下内容来解决问题:config.after_initializedorequire'some_cl
我经常迷上ruby的一件事是递归模式。例如,假设我有一个数组,它可能包含无限深度的数组作为元素。所以,例如:my_array=[1,[2,3,[4,5,[6,7]]]]我想创建一个方法,可以将数组展平为[1,2,3,4,5,6,7]。我知道.flatten可以完成这项工作,但这个问题是作为我经常遇到的递归问题的一个例子-因此我试图找到一个更可重用的解决方案。简而言之-我猜这种事情有一个标准模式,但我想不出任何特别优雅的东西。任何想法表示赞赏 最佳答案 递归是一种方法,它不依赖于语言。您在编写算法时要考虑两种情况:再次调用函数的情
这应该是一个简单的问题,但我找不到任何相关信息。给定一个Ruby中的正则表达式,对于每个匹配项,我需要检索匹配的模式$1、$2,但我还需要匹配位置。我知道=~运算符为我提供了第一个匹配项的位置,而string.scan(/regex/)为我提供了所有匹配模式。如果可能,我需要在同一步骤中获得两个结果。 最佳答案 MatchDatastring.scan(regex)do$1#Patternatfirstposition$2#Patternatsecondposition$~.offset(1)#Startingandendingpo
我想开始使用“Sinatra”框架进行编码,但我找不到该框架的“MVC”模式。是“MVC-Sinatra”模式或框架吗? 最佳答案 您可能想查看Padrino这是一个围绕Sinatra构建的框架,可为您的项目提供更“类似Rails”的感觉,但没有那么多隐藏的魔法。这是使用Sinatra可以做什么的一个很好的例子。虽然如果您需要开始使用这很好,但我个人建议您将它用作学习工具,以对您来说最有意义的方式使用Sinatra构建您自己的应用程序。写一些测试/期望,写一些代码,通过测试-重复:)至于ORM,你还应该结帐Sequel其中(imho
有没有一种方法可以自动生成种子数据文件并创建种子数据,就像您在下面链接中的Laravel中看到的那样?LaravelDatabaseMigrations&Seed我在另一个应用程序上看到在Rails的db文件夹下创建了一些带有时间戳的文件,其中包含种子数据。创建它的好方法是什么? 最佳答案 我建议你使用Fabrication的组合gem和Faker.Fabrication允许您编写一个模式来构建您的对象,而Faker为您提供虚假数据,如姓名、电子邮件、电话号码等。这是制造商的样子:Fabricator(:user)dousernam
我有一个交互式RubyonRails应用程序,我想在特定时间将其置于“只读模式”。这将允许用户读取他们需要的数据,但阻止他们执行写入数据库的操作。执行此操作的一种方法是在数据库中放置一个true/false变量,该变量在进行任何写入之前进行检查。我的问题。有没有更优雅的解决方案来解决这个问题? 最佳答案 如果你真的想阻止任何数据库写入,我能想到的最简单的方法是覆盖readonly?始终返回true的模型方法,无论是在选定模型中还是对于所有ActiveRecord模型。如果模型设置为只读(通常通过调用#readonly!来完成),任何