草庐IT

hadoop - 由于空间问题导致 Spark 作业失败

coder 2024-01-06 原文

我正在使用 pyspark 在 Spark 中编写批处理程序。 以下是输入文件及其大小

base-track.dat (3.9g)
base-attribute-link.dat (18g)
base-release.dat (543m)

这些是每行一条记录的文本文件,每个字段由一个特殊字符分隔(引用代码)

我正在对属性链接执行一些过滤操作并将它们分组并与其他表连接。

我正在通过 spark-submit 将此程序提交到一个由 Ambari 管理的具有 9 个数据节点的 Hadoop 集群。 每个数据节点包含 140 GB 的 RAM 和 3.5 TB 的磁盘空间。

以下是我的pyspark代码

import sys

from pyspark import SparkContext
from pyspark.sql import SQLContext, Row

if __name__ == "__main__":
        sc = SparkContext(appName = "Tracks")
        sqlContext = SQLContext(sc)

        #Load base-track
        track = sc.textFile("base-track/input").map(lambda row: row.split(u'\u0001'))

        #Load base-attribute-link
        attlnk = sc.textFile("base-attribute-link/input").map(lambda row: row.split(u'\u0001'))

        #Load base-release
        release = sc.textFile("base-release/input").map(lambda row: row.split(u'\u0001'))

        attlnk = attlnk.filter(lambda row: row[2] == 'MA0000000162')

        attlnkg = attlnk.groupBy(lambda row: row[1])

        attlnkmax = attlnkg.map( lambda t: (t[0],max([v[4] for v in t[1]])) )

        alg = attlnkmax.map(lambda r: Row(al_objectid=r[0],al_value=r[1]))

        aldf = alg.toDF()

        track = track.map(lambda r:Row(t_tag = r[0], t_trackid= r[1], t_releaseid= r[2], t_songid = r[3], t_med= r[4], t_ph = r[5], t_tn = r[5], t_title= r[5], t_part= r[6], t_dur = r[7], t_pick = r[8], t_amgclid  = r[9], t_amgpopid = r[10], t_compid = r[11], t_muzid = r[12], t_perfid= r[13], t_albumid = r[14]))

        trackdf = track.toDF()

        release = release.map(lambda r:Row(r_tag = r[0], r_relid = r[1], r_albumid = r[2], r_mediafmtid = r[3], r_prodfmtid = r[4], r_reldate = r[5], r_prodcode = r[6], r_prodtypeid = r[7], r_label = r[8], r_relyear = r[9], r_ispurch = r[10], r_amgclassid = r[11], r_amgpopid = r[12], r_eanid = r[13], r_upcid = r[14]))

        releasedf = release.toDF()

        trackaldf = trackdf.join(aldf, trackdf['t_trackid'] == aldf['al_objectid'], 'left_outer')


        tracksdf = trackaldf.join(releasedf, trackaldf['t_releaseid'] == releasedf['r_relid'])

        tracksdf = tracksdf.select('t_trackid', 't_releaseid', 't_songid', 't_med', 't_ph', 't_tn', 't_title', 't_part', 't_dur', 't_pick', 't_amgclid', 't_amgpopid', 't_compid', 't_muzid', 'al_objectid', 't_perfid', 't_albumid', 'r_label')


        tracksdf.rdd.map(lambda x: u"\u0001".join(map(str, x))).coalesce(100).saveAsTextFile("tracks-out")

尝试执行此操作时出现以下错误。

ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /tmp/blockmgr-d88c631e-cec3-4b83-8af6-a38b109b5e3b/0e/temp_shuffle_7dbda3ac-48b1-4c4a-89c7-64eb5d858d90
java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
    at java.io.FileOutputStream.write(FileOutputStream.java:326)
    at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
    at org.xerial.snappy.SnappyOutputStream.flush(SnappyOutputStream.java:336)
    at org.apache.spark.io.SnappyOutputStreamWrapper.flush(CompressionCodec.scala:209)
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141)
    at java.io.DataOutputStream.flush(DataOutputStream.java:123)
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.flush(UnsafeRowSerializer.scala:83)
    at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$1.apply$mcV$sp(DiskBlockObjectWriter.scala:157)
    at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$1.apply(DiskBlockObjectWriter.scala:154)
    at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$1.apply(DiskBlockObjectWriter.scala:154)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1239)
    at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(DiskBlockObjectWriter.scala:161)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.stop(BypassMergeSortShuffleWriter.java:232)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

有几个关于 SO 的问题,herehere与同一问题有关。

这是我从上面两个问题中尝试过的。 我试图将 spark.yarn.executor.memoryOverhead 从 384 MB 增加到 4GB。

SPARK_JAVA_OPTS+=" -Dspark.local.dir=/mnt/spark,/mnt2/spark -Dhadoop.tmp.dir=/mnt/ephemeral-hdfs"

export SPARK_JAVA_OPTS

第一个没有任何效果。如果我添加 java opts,我会收到/mnt 目录不存在的错误。

在多个论坛(包括 databricks)上阅读了有关此问题的信息后,我有一些模糊的想法,认为此作业正在尝试创建临时文件,作为每个集群节点/tmp 上随机播放的一部分并耗尽空间。在每个集群节点上,我们为 tmp 目录所在的根 (/) 分区分配了 100 GB。

我已经努力了一个多月,通过使用各种 spark 配置参数来执行此操作。作为调整的一部分,我将 spark.driver 和 spark.executor 内存增加到 16g,后来又增加到 64g。还将 spark yarn 执行器内存增加到 4GB。不幸的是,这些都不能解决空间问题。

任何关于如何进一步进行的指导都会有很大帮助。

[Edit-1] 我只是检查所有机器上根目录的磁盘空间,我们集群中的 9 个节点中有 7 个为根目录分配了 100+GB,但在 2 个节点上只分配了 10 GB,他们只剩下 6+GB。这可能是导致磁盘空间问题的原因,如果可以扩展根目录的大小,我将不得不与我们的 IT 团队核实。

[Edit-2] 我与 IT 团队合作将所有机器上的根分区大小扩展到 100+GB,但问题仍然存在,可能是 100GB 的/tmp 空间也不足以胜任这份工作。我估计这项工作的输出大约为 4.6GB。

最佳答案

鉴于您的错误的性质以及您正在对数十 GB 的数据执行大型连接这一事实,其中 spark worker 将在洗牌时将中间数据写入磁盘,100GB 的磁盘似乎不够。我建议为默认的 worker_dir 和 local_dirs 分配更多的磁盘,方法是将它们安装到更大的磁盘或配置更大的根磁盘。另请注意,如果 spark 未正确关闭,则此中间数据​​可能会持续存在并占用工作节点上的大量空间。因此,您可能必须检查这些目录并删除所有陈旧文件。如果您在 AWS r3、c3 或具有大型临时 SSD 磁盘的类似实例类型上运行 spark-standalone,我建议 mounting那些磁盘说“mnt”和“mnt2”和configuring spark scratch space 指向那些挂载,而不是(通常)较小的根卷。例如:

SPARK_LOCAL_DIRS=/mnt
SPARK_WORKER_DIR=/mnt2

关于hadoop - 由于空间问题导致 Spark 作业失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44679244/

有关hadoop - 由于空间问题导致 Spark 作业失败的更多相关文章

  1. ruby-on-rails - 由于 "wkhtmltopdf",PDFKIT 显然无法正常工作 - 2

    我在从html页面生成PDF时遇到问题。我正在使用PDFkit。在安装它的过程中,我注意到我需要wkhtmltopdf。所以我也安装了它。我做了PDFkit的文档所说的一切......现在我在尝试加载PDF时遇到了这个错误。这里是错误:commandfailed:"/usr/local/bin/wkhtmltopdf""--margin-right""0.75in""--page-size""Letter""--margin-top""0.75in""--margin-bottom""0.75in""--encoding""UTF-8""--margin-left""0.75in""-

  2. ruby - 在 64 位 Snow Leopard 上使用 rvm、postgres 9.0、ruby 1.9.2-p136 安装 pg gem 时出现问题 - 2

    我想为Heroku构建一个Rails3应用程序。他们使用Postgres作为他们的数据库,所以我通过MacPorts安装了postgres9.0。现在我需要一个postgresgem并且共识是出于性能原因你想要pggem。但是我对我得到的错误感到非常困惑当我尝试在rvm下通过geminstall安装pg时。我已经非常明确地指定了所有postgres目录的位置可以找到但仍然无法完成安装:$envARCHFLAGS='-archx86_64'geminstallpg--\--with-pg-config=/opt/local/var/db/postgresql90/defaultdb/po

  3. ruby - 通过 rvm 升级 ruby​​gems 的问题 - 2

    尝试通过RVM将RubyGems升级到版本1.8.10并出现此错误:$rvmrubygemslatestRemovingoldRubygemsfiles...Installingrubygems-1.8.10forruby-1.9.2-p180...ERROR:Errorrunning'GEM_PATH="/Users/foo/.rvm/gems/ruby-1.9.2-p180:/Users/foo/.rvm/gems/ruby-1.9.2-p180@global:/Users/foo/.rvm/gems/ruby-1.9.2-p180:/Users/foo/.rvm/gems/rub

  4. ruby - 通过 RVM (OSX Mountain Lion) 安装 Ruby 2.0.0-p247 时遇到问题 - 2

    我的最终目标是安装当前版本的RubyonRails。我在OSXMountainLion上运行。到目前为止,这是我的过程:已安装的RVM$\curl-Lhttps://get.rvm.io|bash-sstable检查已知(我假设已批准)安装$rvmlistknown我看到当前的稳定版本可用[ruby-]2.0.0[-p247]输入命令安装$rvminstall2.0.0-p247注意:我也试过这些安装命令$rvminstallruby-2.0.0-p247$rvminstallruby=2.0.0-p247我很快就无处可去了。结果:$rvminstall2.0.0-p247Search

  5. ruby - Fast-stemmer 安装问题 - 2

    由于fast-stemmer的问题,我很难安装我想要的任何ruby​​gem。我把我得到的错误放在下面。Buildingnativeextensions.Thiscouldtakeawhile...ERROR:Errorinstallingfast-stemmer:ERROR:Failedtobuildgemnativeextension./System/Library/Frameworks/Ruby.framework/Versions/2.0/usr/bin/rubyextconf.rbcreatingMakefilemake"DESTDIR="cleanmake"DESTDIR=

  6. ruby - 即使失败也继续进行多主机测试 - 2

    我已经构建了一些serverspec代码来在多个主机上运行一组测试。问题是当任何测试失败时,测试会在当前主机停止。即使测试失败,我也希望它继续在所有主机上运行。Rakefile:namespace:specdotask:all=>hosts.map{|h|'spec:'+h.split('.')[0]}hosts.eachdo|host|begindesc"Runserverspecto#{host}"RSpec::Core::RakeTask.new(host)do|t|ENV['TARGET_HOST']=hostt.pattern="spec/cfengine3/*_spec.r

  7. ruby - 安装 Ruby 时遇到问题(无法下载资源 "readline--patch") - 2

    当我尝试安装Ruby时遇到此错误。我试过查看this和this但无济于事➜~brewinstallrubyWarning:YouareusingOSX10.12.Wedonotprovidesupportforthispre-releaseversion.Youmayencounterbuildfailuresorotherbreakages.Pleasecreatepull-requestsinsteadoffilingissues.==>Installingdependenciesforruby:readline,libyaml,makedepend==>Installingrub

  8. java - 从 JRuby 调用 Java 类的问题 - 2

    我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www

  9. ruby-on-rails - 简单的 Ruby on Rails 问题——如何将评论附加到用户和文章? - 2

    我意识到这可能是一个非常基本的问题,但我现在已经花了几天时间回过头来解决这个问题,但出于某种原因,Google就是没有帮助我。(我认为部分问题在于我是一个初学者,我不知道该问什么......)我也看过O'Reilly的RubyCookbook和RailsAPI,但我仍然停留在这个问题上.我找到了一些关于多态关系的信息,但它似乎不是我需要的(尽管如果我错了请告诉我)。我正在尝试调整MichaelHartl'stutorial创建一个包含用户、文章和评论的博客应用程序(不使用脚手架)。我希望评论既属于用户又属于文章。我的主要问题是:我不知道如何将当前文章的ID放入评论Controller。

  10. 【高数】用拉格朗日中值定理解决极限问题 - 2

    首先回顾一下拉格朗日定理的内容:函数f(x)是在闭区间[a,b]上连续、开区间(a,b)上可导的函数,那么至少存在一个,使得:通过这个表达式我们可以知道,f(x)是函数的主体,a和b可以看作是主体函数f(x)中所取的两个值。那么可以有,  也就意味着我们可以用来替换 这种替换可以用在求某些多项式差的极限中。方法: 外层函数f(x)是一致的,并且h(x)和g(x)是等价无穷小。此时,利用拉格朗日定理,将原式替换为 ,再进行求解,往往会省去复合函数求极限的很多麻烦。使用要注意:1.要先找到主体函数f(x),即外层函数必须相同。2.f(x)找到后,复合部分是等价无穷小。3.要满足作差的形式。如果是加

随机推荐