草庐IT

Java Apache Spark : Long transformation chains result in quadratic time

coder 2024-03-28 原文

我有一个使用 Apache Spark 的 Java 程序。该程序最有趣的部分如下所示:

long seed = System.nanoTime();

JavaRDD<AnnotatedDocument> annotated = documents
    .mapPartitionsWithIndex(new InitialAnnotater(seed), true);
annotated.cache();

for (int iter = 0; iter < 2000; iter++) {
    GlobalCounts counts = annotated
        .mapPartitions(new GlobalCounter())
        .reduce((a, b) -> a.sum(b)); // update overall counts (*)

    seed = System.nanoTime();

    // copy overall counts which CountChanger uses to compute a stochastic thing (**)
    annotated = annotated
        .mapPartitionsWithIndex(new CountChanger(counts, seed),  true); 
    annotated.cache();

    // adding these lines causes constant time complexity like i want
    //List<AnnotatedDocument> ll = annotated.collect();
    //annotated = sc.parallelize(ll, 8); 
}

所以实际上,行 (**) 产生了一个 RDD 的形式

documents
    .mapPartitionsWithIndex(initial)
    .mapPartitionsWithIndex(nextIter)
    .mapPartitionsWithIndex(nextIter)
    .mapPartitionsWithIndex(nextIter)
    ... 2000 more

确实是一个很长的 map 链。此外,行 (*) 在每次迭代时强制计算(非惰性),因为需要更新计数。

我遇到的问题是我得到的时间复杂度随着每次迭代而线性增加,因此总体上呈二次方:

我认为这是因为 Spark 试图“记住”链中的每个 RDD,以及容错算法或导致其增长的任何因素。但是,我真的不知道。

我真正想做的是在每次迭代时告诉 Spark “折叠”RDD,以便只有最后一个保留在内存中并继续处理。我认为这应该导致每次迭代的时间恒定。这可能吗?还有其他解决方案吗?

谢谢!

最佳答案

尝试使用 rdd.checkpoint。这会将 RDD 保存到 hdfs 并清除沿袭。

每次转换 RDD 时,都会增加沿袭,Spark 必须跟踪可用的内容以及必须重新计算的内容。处理 DAG 非常昂贵,而且大型 DAG 往往会很快降低性能。通过“检查点”,您指示 Spark 计算并保存生成的 RDD,并丢弃有关其创建方式的信息。这使得它类似于简单地保存一个 RDD 并读回它,从而最大限度地减少 DAG 操作。

旁注,由于您遇到了这个问题,很高兴知道 union 还通过添加 steps 影响 RDD 性能,并且还可能抛出 StackOverflowError 由于沿袭信息的方式。 See this post

This link有更多详细信息和漂亮的图表,还提到了主题 in this SO post .

关于Java Apache Spark : Long transformation chains result in quadratic time,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36127689/

有关Java Apache Spark : Long transformation chains result in quadratic time的更多相关文章

随机推荐