我在 apache spark 中遇到了一个奇怪的问题,我将不胜感激。 从 hdfs 读取数据(并进行一些从 json 到对象的转换)后,下一阶段(处理所述对象)在处理完 2 个分区(总共 512 个)后失败。 这种情况发生在大型数据集上(我注意到的最小数据集约为 700 兆,但可能会更低,我还没有缩小范围)。
编辑:700 megs 是 tgz 文件大小,未压缩是 6 gigs。
编辑 2:同样的事情发生在 spark 1.1.0
我在一台 32 核、60 演出的机器上使用本地主机运行 spark,设置如下:
spark.akka.timeout = 200
spark.shuffle.consolidateFiles = true
spark.kryoserializer.buffer.mb = 128
spark.reducer.maxMbInFlight = 128
具有 16 gig 执行程序堆大小。内存没有用尽,CPU 负载可以忽略不计。 Spark 永远挂起。
下面是 Spark 日志:
14/09/11 10:19:52 INFO HadoopRDD: Input split: hdfs://localhost:9000/spew/data/json.lines:6351070299+12428842
14/09/11 10:19:53 INFO Executor: Serialized size of result for 511 is 1263
14/09/11 10:19:53 INFO Executor: Sending result for 511 directly to driver
14/09/11 10:19:53 INFO Executor: Finished task ID 511
14/09/11 10:19:53 INFO TaskSetManager: Finished TID 511 in 868 ms on localhost (progress: 512/512)
14/09/11 10:19:53 INFO DAGScheduler: Completed ShuffleMapTask(3, 511)
14/09/11 10:19:53 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool
14/09/11 10:19:53 INFO DAGScheduler: Stage 3 (mapToPair at Main.java:205) finished in 535.874 s
14/09/11 10:19:53 INFO DAGScheduler: looking for newly runnable stages
14/09/11 10:19:53 INFO DAGScheduler: running: Set()
14/09/11 10:19:53 INFO DAGScheduler: waiting: Set(Stage 0, Stage 1, Stage 2)
14/09/11 10:19:53 INFO DAGScheduler: failed: Set()
14/09/11 10:19:53 INFO DAGScheduler: Missing parents for Stage 0: List(Stage 1)
14/09/11 10:19:53 INFO DAGScheduler: Missing parents for Stage 1: List(Stage 2)
14/09/11 10:19:53 INFO DAGScheduler: Missing parents for Stage 2: List()
14/09/11 10:19:53 INFO DAGScheduler: Submitting Stage 2 (FlatMappedRDD[10] at flatMapToPair at Driver.java:145), which is now runnable
14/09/11 10:19:53 INFO DAGScheduler: Submitting 512 missing tasks from Stage 2 (FlatMappedRDD[10] at flatMapToPair at Driver.java:145)
14/09/11 10:19:53 INFO TaskSchedulerImpl: Adding task set 2.0 with 512 tasks
14/09/11 10:19:53 INFO TaskSetManager: Starting task 2.0:0 as TID 512 on executor localhost: localhost (PROCESS_LOCAL)
14/09/11 10:19:53 INFO TaskSetManager: Serialized task 2.0:0 as 3469 bytes in 0 ms
14/09/11 10:19:53 INFO Executor: Running task ID 512
14/09/11 10:19:53 INFO BlockManager: Found block broadcast_0 locally
14/09/11 10:19:53 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 134217728, targetRequestSize: 26843545
14/09/11 10:19:53 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 512 non-empty blocks out of 512 blocks
14/09/11 10:19:53 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 6 ms
14/09/11 10:20:07 INFO Executor: Serialized size of result for 512 is 1479
14/09/11 10:20:07 INFO Executor: Sending result for 512 directly to driver
14/09/11 10:20:07 INFO Executor: Finished task ID 512
14/09/11 10:20:07 INFO TaskSetManager: Starting task 2.0:1 as TID 513 on executor localhost: localhost (PROCESS_LOCAL)
14/09/11 10:20:07 INFO TaskSetManager: Serialized task 2.0:1 as 3469 bytes in 0 ms
14/09/11 10:20:07 INFO Executor: Running task ID 513
14/09/11 10:20:07 INFO TaskSetManager: Finished TID 512 in 13996 ms on localhost (progress: 1/512)
14/09/11 10:20:07 INFO DAGScheduler: Completed ShuffleMapTask(2, 0)
14/09/11 10:20:07 INFO BlockManager: Found block broadcast_0 locally
14/09/11 10:20:07 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 134217728, targetRequestSize: 26843545
14/09/11 10:20:07 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 512 non-empty blocks out of 512 blocks
14/09/11 10:20:07 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 1 ms
14/09/11 10:20:15 INFO Executor: Serialized size of result for 513 is 1479
14/09/11 10:20:15 INFO Executor: Sending result for 513 directly to driver
14/09/11 10:20:15 INFO Executor: Finished task ID 513
14/09/11 10:20:15 INFO TaskSetManager: Starting task 2.0:2 as TID 514 on executor localhost: localhost (PROCESS_LOCAL)
14/09/11 10:20:15 INFO TaskSetManager: Serialized task 2.0:2 as 3469 bytes in 0 ms
14/09/11 10:20:15 INFO Executor: Running task ID 514
14/09/11 10:20:15 INFO TaskSetManager: Finished TID 513 in 7768 ms on localhost (progress: 2/512)
14/09/11 10:20:15 INFO DAGScheduler: Completed ShuffleMapTask(2, 1)
14/09/11 10:20:15 INFO BlockManager: Found block broadcast_0 locally
14/09/11 10:20:15 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 134217728, targetRequestSize: 26843545
14/09/11 10:20:15 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 512 non-empty blocks out of 512 blocks
14/09/11 10:20:15 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 1 ms
1) DAGScheduler: failed: Set() 是什么意思?我认为它并不重要,因为它是 INFO 级别,但你永远不知道。
2) Missing parents 是什么意思?同样,它是信息。
这是 jstack 的输出:
"Service Thread" #20 daemon prio=9 os_prio=0 tid=0x00007f39400ff000 nid=0x10560 runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C1 CompilerThread14" #19 daemon prio=9 os_prio=0 tid=0x00007f39400fa000 nid=0x1055f waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C1 CompilerThread13" #18 daemon prio=9 os_prio=0 tid=0x00007f39400f8000 nid=0x1055e waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C1 CompilerThread12" #17 daemon prio=9 os_prio=0 tid=0x00007f39400f6000 nid=0x1055d waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C1 CompilerThread11" #16 daemon prio=9 os_prio=0 tid=0x00007f39400f4000 nid=0x1055c waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C1 CompilerThread10" #15 daemon prio=9 os_prio=0 tid=0x00007f39400f1800 nid=0x1055b waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread9" #14 daemon prio=9 os_prio=0 tid=0x00007f39400ef800 nid=0x1055a waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread8" #13 daemon prio=9 os_prio=0 tid=0x00007f39400ed800 nid=0x10559 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread7" #12 daemon prio=9 os_prio=0 tid=0x00007f39400eb800 nid=0x10558 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread6" #11 daemon prio=9 os_prio=0 tid=0x00007f39400e9800 nid=0x10557 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread5" #10 daemon prio=9 os_prio=0 tid=0x00007f39400e7800 nid=0x10556 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread4" #9 daemon prio=9 os_prio=0 tid=0x00007f39400dd000 nid=0x10555 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread3" #8 daemon prio=9 os_prio=0 tid=0x00007f39400db000 nid=0x10554 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread2" #7 daemon prio=9 os_prio=0 tid=0x00007f39400d8800 nid=0x10553 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread1" #6 daemon prio=9 os_prio=0 tid=0x00007f39400d7000 nid=0x10552 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread0" #5 daemon prio=9 os_prio=0 tid=0x00007f39400d4000 nid=0x10551 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Signal Dispatcher" #4 daemon prio=9 os_prio=0 tid=0x00007f39400d2000 nid=0x10550 runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007f39400a2800 nid=0x1054f in Object.wait() [0x00007f38d39f8000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:142)
- locked <0x00000000e0038180> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:158)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
"Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007f39400a0800 nid=0x1054e in Object.wait() [0x00007f38d3af9000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:157)
- locked <0x00000000e00161b8> (a java.lang.ref.Reference$Lock)
"main" #1 prio=5 os_prio=0 tid=0x00007f394000a000 nid=0x10535 in Object.wait() [0x00007f3945ee1000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000e03df000> (a org.apache.spark.scheduler.JobWaiter)
at java.lang.Object.wait(Object.java:502)
at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
- locked <0x00000000e03df000> (a org.apache.spark.scheduler.JobWaiter)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:452)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1051)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1069)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1083)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1097)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:716)
at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:294)
at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:44)
at spew.Driver.run(Driver.java:88)
at spew.Main.main(Main.java:92)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
"VM Thread" os_prio=0 tid=0x00007f3940099800 nid=0x1054d runnable
"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007f394001f800 nid=0x10536 runnable
"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007f3940021000 nid=0x10537 runnable
"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x00007f3940023000 nid=0x10538 runnable
"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x00007f3940024800 nid=0x10539 runnable
"GC task thread#4 (ParallelGC)" os_prio=0 tid=0x00007f3940026800 nid=0x1053a runnable
"GC task thread#5 (ParallelGC)" os_prio=0 tid=0x00007f3940028000 nid=0x1053b runnable
"GC task thread#6 (ParallelGC)" os_prio=0 tid=0x00007f394002a000 nid=0x1053c runnable
"GC task thread#7 (ParallelGC)" os_prio=0 tid=0x00007f394002b800 nid=0x1053d runnable
"GC task thread#8 (ParallelGC)" os_prio=0 tid=0x00007f394002d000 nid=0x1053e runnable
"GC task thread#9 (ParallelGC)" os_prio=0 tid=0x00007f394002f000 nid=0x1053f runnable
"GC task thread#10 (ParallelGC)" os_prio=0 tid=0x00007f3940030800 nid=0x10540 runnable
"GC task thread#11 (ParallelGC)" os_prio=0 tid=0x00007f3940032800 nid=0x10541 runnable
"GC task thread#12 (ParallelGC)" os_prio=0 tid=0x00007f3940034000 nid=0x10542 runnable
"GC task thread#13 (ParallelGC)" os_prio=0 tid=0x00007f3940036000 nid=0x10543 runnable
"GC task thread#14 (ParallelGC)" os_prio=0 tid=0x00007f3940037800 nid=0x10544 runnable
"GC task thread#15 (ParallelGC)" os_prio=0 tid=0x00007f3940039800 nid=0x10545 runnable
"GC task thread#16 (ParallelGC)" os_prio=0 tid=0x00007f394003b000 nid=0x10546 runnable
"GC task thread#17 (ParallelGC)" os_prio=0 tid=0x00007f394003d000 nid=0x10547 runnable
"GC task thread#18 (ParallelGC)" os_prio=0 tid=0x00007f394003e800 nid=0x10548 runnable
"GC task thread#19 (ParallelGC)" os_prio=0 tid=0x00007f3940040800 nid=0x10549 runnable
"GC task thread#20 (ParallelGC)" os_prio=0 tid=0x00007f3940042000 nid=0x1054a runnable
"GC task thread#21 (ParallelGC)" os_prio=0 tid=0x00007f3940044000 nid=0x1054b runnable
"GC task thread#22 (ParallelGC)" os_prio=0 tid=0x00007f3940045800 nid=0x1054c runnable
"VM Periodic Task Thread" os_prio=0 tid=0x00007f3940102000 nid=0x10561 waiting on condition
JNI global references: 422
有没有人遇到过这样的 Spark 问题?这很奇怪,因为它适用于小型(微型)数据集(测试装置等)。
最佳答案
我不回答这么老的问题spark版本,但我知道当应用程序挂起时,可能是由于您的资源被终止(例如 yarn)。
我在 Is Spark's KMeans unable to handle bigdata? 中遇到了类似的问题你能做的最好的事情就是微调你的应用程序,因为你的问题中没有任何信息可以建议如何解决这个问题。
您还可以根据经验微调分区的数量。
对于另一项我必须微调以扩展到 15T 数据的工作,我在 memoryOverhead issue in Spark 中报告了我的方法。 ,但我不知道这是否相关。
正如 Karl Higley 所建议的,我同意:
有向无环图 (DAG) 调度失败意味着失败阶段集为空(即还没有任何失败。)。
Missing parents 是阶段的列表,其结果需要计算请求的结果并且尚未缓存在内存中。
关于hadoop - Spark 1.0.2(也是 1.1.0)卡在一个分区上,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25787639/
使用带有Rails插件的vim,您可以创建一个迁移文件,然后一次性打开该文件吗?textmate也可以这样吗? 最佳答案 你可以使用rails.vim然后做类似的事情::Rgeneratemigratonadd_foo_to_bar插件将打开迁移生成的文件,这正是您想要的。我不能代表textmate。 关于ruby-使用VimRails,您可以创建一个新的迁移文件并一次性打开它吗?,我们在StackOverflow上找到一个类似的问题: https://sta
我需要从一个View访问多个模型。以前,我的links_controller仅用于提供以不同方式排序的链接资源。现在我想包括一个部分(我假设)显示按分数排序的顶级用户(@users=User.all.sort_by(&:score))我知道我可以将此代码插入每个链接操作并从View访问它,但这似乎不是“ruby方式”,我将需要在不久的将来访问更多模型。这可能会变得很脏,是否有针对这种情况的任何技术?注意事项:我认为我的应用程序正朝着单一格式和动态页面内容的方向发展,本质上是一个典型的网络应用程序。我知道before_filter但考虑到我希望应用程序进入的方向,这似乎很麻烦。最终从任何
我想要做的是有2个不同的Controller,client和test_client。客户端Controller已经构建,我想创建一个test_clientController,我可以使用它来玩弄客户端的UI并根据需要进行调整。我主要是想绕过我在客户端中内置的验证及其对加载数据的管理Controller的依赖。所以我希望test_clientController加载示例数据集,然后呈现客户端Controller的索引View,以便我可以调整客户端UI。就是这样。我在test_clients索引方法中试过这个:classTestClientdefindexrender:template=>
为什么4.1%2返回0.0999999999999996?但是4.2%2==0.2。 最佳答案 参见此处:WhatEveryProgrammerShouldKnowAboutFloating-PointArithmetic实数是无限的。计算机使用的位数有限(今天是32位、64位)。因此计算机进行的浮点运算不能代表所有的实数。0.1是这些数字之一。请注意,这不是与Ruby相关的问题,而是与所有编程语言相关的问题,因为它来自计算机表示实数的方式。 关于ruby-为什么4.1%2使用Ruby返
如果您尝试在Ruby中的nil对象上调用方法,则会出现NoMethodError异常并显示消息:"undefinedmethod‘...’fornil:NilClass"然而,有一个tryRails中的方法,如果它被发送到一个nil对象,它只返回nil:require'rubygems'require'active_support/all'nil.try(:nonexisting_method)#noNoMethodErrorexceptionanymore那么try如何在内部工作以防止该异常? 最佳答案 像Ruby中的所有其他对象
关闭。这个问题需要detailsorclarity.它目前不接受答案。想改进这个问题吗?通过editingthispost添加细节并澄清问题.关闭8年前。Improvethisquestion为什么SecureRandom.uuid创建一个唯一的字符串?SecureRandom.uuid#=>"35cb4e30-54e1-49f9-b5ce-4134799eb2c0"SecureRandom.uuid方法创建的字符串从不重复?
rpartition和partition有什么区别?我已经阅读了文档,但我认为它们是一样的。只是那些出现在后来的ruby版本中吗? 最佳答案 以下示例将有助于识别差异:"abccba".partition("b")#=>["a","b","ccba"]"abccba".rpartition("b")#=>["abcc","b","a"]所以区别在于rpartition搜索最右边的匹配项,而不是最左边的匹配项。 关于Rubyrpartition与分区?,我们在StackOverflow
我有一个正在构建的应用程序,我需要一个模型来创建另一个模型的实例。我希望每辆车都有4个轮胎。汽车模型classCar轮胎模型classTire但是,在make_tires内部有一个错误,如果我为Tire尝试它,则没有用于创建或新建的activerecord方法。当我检查轮胎时,它没有这些方法。我该如何补救?错误是这样的:未定义的方法'create'forActiveRecord::AttributeMethods::Serialization::Tire::Module我测试了两个环境:测试和开发,它们都因相同的错误而失败。 最佳答案
我想在Ruby中创建一个用于开发目的的极其简单的Web服务器(不,不想使用现成的解决方案)。代码如下:#!/usr/bin/rubyrequire'socket'server=TCPServer.new('127.0.0.1',8080)whileconnection=server.acceptheaders=[]length=0whileline=connection.getsheaders想法是从命令行运行这个脚本,提供另一个脚本,它将在其标准输入上获取请求,并在其标准输出上返回完整的响应。到目前为止一切顺利,但事实证明这真的很脆弱,因为它在第二个请求上中断并出现错误:/usr/b
我想让一个yaml对象引用另一个,如下所示:intro:"Hello,dearuser."registration:$introThanksforregistering!new_message:$introYouhaveanewmessage!上面的语法只是它如何工作的一个例子(这也是它在thiscpanmodule中的工作方式。)我正在使用标准的rubyyaml解析器。这可能吗? 最佳答案 一些yaml对象确实引用了其他对象:irb>require'yaml'#=>trueirb>str="hello"#=>"hello"ir