草庐IT

记录一则Spark读写和Lost Excutor错误的分析和解决过程

zengzhaozheng 2023-03-28 原文
一、概述

  上篇blog记录了些在用spark-sql时遇到的一些问题,今天继续记录用Spark提供的RDD转化方法开发公司第一期标签分析系统(一部分scala作业逻辑代码后面blog再给大家分享)遇到的一些SPARK作业错误信息。其中有些问题可能一些数据量或者shuffle量比较小的作业时不会遇到的,我们整套标签系统的初级输入数据大概是8T左右,这里也是个参考。(下面的Spark部署模式为spark on yarn)

二、问题

1、大规模数据往HDFS中写时候,报了HDFS读写超时,具体日志看下面。

(1)具体到某个Excutor的错误日志:

(2)具体到各个数据节点DataNode的日志:

分析:

从这两个错误信息首先可以将错误定位到整个HDFS的读写过程中,其中对于读写超时可以定位到2个参数:dfs.client.socket-timeout(默认60s)、dfs.datanode.socket.write.timeout(默认80s)。在spark的程序中按照自己的实际情况设置这两个值,问题可以解决。给个例子:

val dwd_new_pc_list_patch = "/user/hive/warehouse/pc.db/dwd_new_pc_list/2015-01-*/action=play" val sparkConf = new SparkConf().setAppName("TagSystem_compositeTag")   .set("spark.kryoserializer.buffer.max.mb", "128").set("spark.rdd.compress","true") val sc = new SparkContext(sparkConf) //hdfs客户端的读写超时时间 //默认60000 sc.hadoopConfiguration.set("dfs.client.socket-timeout", "180000") //默认80000 sc.hadoopConfiguration.set("dfs.datanode.socket.write.timeout", "180000") val sqlContext = new org.apache.spark.sql.SQLContext(sc) val hiveSqlContext = new org.apache.spark.sql.hive.HiveContext(sc) //(user_id,fo,fo_2,sty,fs) val source = sc.textFile(dwd_new_pc_list_patch).filter(p => (p.trim != "" && p.split("\\|").length >= 105)).mapPartitions({ it =>   for {     line <- it   } yield (line.split("\\|")(21), line.split("\\|")(9), line.split("\\|")(104), line.split("\\|")(40), line.split("\\|")(7)) }).persist(StorageLevel.MEMORY_AND_DISK_SER) . . .另外相似问题:https://jira.spring.io/si/jira.issueviews:issue-html/SHDP-404/SHDP-404.html

2、由spark.reducer.maxMbInFlight引起的Lost Excutor问题。

 这个错误主要是发生在shuffle中的fetch阶段,由于Excutor 已经lost掉了,由于容错机制另外重新启动一个Excutor,但是在之前lost掉的Excutor中保存的blockManager已经完全丢失,所以之前的stage需要重新计算。具体在dirver或者CoarseGrainedExecutorBackend的日志主要提示超时和读写文件失败,截了下超时的错误提示:

解决方法:

  处理Lost Excutor问题还是花了比较长的时间,调整了很多参数都不行。最后将spark.reducer.maxMbInFlight调小或者将spark.shuffle.copier.threads调小问题解决。在家里还是详细的研究了下spark.reducer.maxMbInFlight这个参数的具体机制含义。spark.reducer.maxMbInFlight官方的配置文档的说明有些笼统:大概的意思是同事从reduce task中取出的ShuffleTask输出最大值(默认48MB)。这个从字面上理解还是不怎么容易的,从源码上search这个参数,定位到org.apache.spark.storage.BlockFetcherIterator.BasicBlockFetcherIterator#splitLocalRemoteBlocks

    protected def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {       // Make remote requests at most maxBytesInFlight / 5 in length; the reason to keep them       // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5       // nodes, rather than blocking on reading output from one node.       //每个fetch线程获取的数据量大小(默认5个fetch线程)       val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)       logInfo("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " + targetRequestSize)       // Split local and remote blocks. Remote blocks are further split into FetchRequests of size       // at most maxBytesInFlight in order to limit the amount of data in flight.       val remoteRequests = new ArrayBuffer[FetchRequest]       var totalBlocks = 0       for ((address, blockInfos) <- blocksByAddress) { //  address实际上是executor_id         totalBlocks += blockInfos.size         if (address == blockManagerId) {            // Filter out zero-sized blocks           localBlocksToFetch ++= blockInfos.filter(_._2 != 0).map(_._1)           _numBlocksToFetch += localBlocksToFetch.size         } else {           val iterator = blockInfos.iterator           var curRequestSize = 0L           var curBlocks = new ArrayBuffer[(BlockId, Long)]           while (iterator.hasNext) {           // blockId 是org.apache.spark.storage.ShuffleBlockId,           // 格式:"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId             val (blockId, size) = iterator.next()             // Skip empty blocks             if (size > 0) {                curBlocks += ((blockId, size))               remoteBlocksToFetch += blockId               _numBlocksToFetch += 1               curRequestSize += size             } else if (size < 0) {               throw new BlockException(blockId, "Negative block size " + size)             }              // 避免一次请求的数据量过大             if (curRequestSize >= targetRequestSize) {               // Add this FetchRequest               remoteRequests += new FetchRequest(address, curBlocks)               curBlocks = new ArrayBuffer[(BlockId, Long)]               logDebug(s"Creating fetch request of $curRequestSize at $address")               curRequestSize = 0             }           }           // Add in the final request           // 将剩余的请求放到最后一个request中。           if (!curBlocks.isEmpty) {              remoteRequests += new FetchRequest(address, curBlocks)           }         }       }       logInfo("Getting " + _numBlocksToFetch + " non-empty blocks out of " +         totalBlocks + " blocks")       remoteRequests     }从代码上看我的个人理解是在shuffle节点每个reduce task会启动5个fetch线程(可以由spark.shuffle.copier.threads配置)去最多spark.reducer.maxMbInFlight个(默认5)其他Excuctor中获取文件位置,然后去fetch它们,并且每次fetch的抓取量不会超过spark.reducer.maxMbInFlight(默认值为48MB)/5。这种机制我个人理解,第一:可以减少单个fetch连接的网络IO、第二:这种将fetch数据并行执行有助于抓取速度提高,减少请求数据的抓取时间总和。

  回来结合我现在的问题分析,我将spark.reducer.maxMbInFlight调小,从而减少了每个reduce task中的每个fetch线程的抓取数据量,进而减少了每个fetch连接的持续连接时间,降低了由于reduce task过多导致每个Excutor中存在的fetch线程太多而导致的fetch超时,另外降低内存的占用。

上述分析为个人理解,如有更深入的想法欢迎交流。


有关记录一则Spark读写和Lost Excutor错误的分析和解决过程的更多相关文章

  1. ruby-on-rails - Rails 常用字符串(用于通知和错误信息等) - 2

    大约一年前,我决定确保每个包含非唯一文本的Flash通知都将从模块中的方法中获取文本。我这样做的最初原因是为了避免一遍又一遍地输入相同的字符串。如果我想更改措辞,我可以在一个地方轻松完成,而且一遍又一遍地重复同一件事而出现拼写错误的可能性也会降低。我最终得到的是这样的:moduleMessagesdefformat_error_messages(errors)errors.map{|attribute,message|"Error:#{attribute.to_s.titleize}#{message}."}enddeferror_message_could_not_find(obje

  2. ruby - Sinatra:运行 rspec 测试时记录噪音 - 2

    Sinatra新手;我正在运行一些rspec测试,但在日志中收到了一堆不需要的噪音。如何消除日志中过多的噪音?我仔细检查了环境是否设置为:test,这意味着记录器级别应设置为WARN而不是DEBUG。spec_helper:require"./app"require"sinatra"require"rspec"require"rack/test"require"database_cleaner"require"factory_girl"set:environment,:testFactoryGirl.definition_file_paths=%w{./factories./test/

  3. ruby-on-rails - 迷你测试错误 : "NameError: uninitialized constant" - 2

    我遵循MichaelHartl的“RubyonRails教程:学习Web开发”,并创建了检查用户名和电子邮件长度有效性的测试(名称最多50个字符,电子邮件最多255个字符)。test/helpers/application_helper_test.rb的内容是:require'test_helper'classApplicationHelperTest在运行bundleexecraketest时,所有测试都通过了,但我看到以下消息在最后被标记为错误:ERROR["test_full_title_helper",ApplicationHelperTest,1.820016791]test

  4. ruby-on-rails - 如何在 Rails View 上显示错误消息? - 2

    我是rails的新手,想在form字段上应用验证。myviewsnew.html.erb.....模拟.rbclassSimulation{:in=>1..25,:message=>'Therowmustbebetween1and25'}end模拟Controller.rbclassSimulationsController我想检查模型类中row字段的整数范围,如果不在范围内则返回错误信息。我可以检查上面代码的范围,但无法返回错误消息提前致谢 最佳答案 关键是您使用的是模型表单,一种显示ActiveRecord模型实例属性的表单。c

  5. 使用 ACL 调用 upload_file 时出现 Ruby S3 "Access Denied"错误 - 2

    我正在尝试编写一个将文件上传到AWS并公开该文件的Ruby脚本。我做了以下事情:s3=Aws::S3::Resource.new(credentials:Aws::Credentials.new(KEY,SECRET),region:'us-west-2')obj=s3.bucket('stg-db').object('key')obj.upload_file(filename)这似乎工作正常,除了该文件不是公开可用的,而且我无法获得它的公共(public)URL。但是当我登录到S3时,我可以正常查看我的文件。为了使其公开可用,我将最后一行更改为obj.upload_file(file

  6. ruby-on-rails - 错误 : Error installing pg: ERROR: Failed to build gem native extension - 2

    我克隆了一个rails仓库,我现在正尝试捆绑安装背景:OSXElCapitanruby2.2.3p173(2015-08-18修订版51636)[x86_64-darwin15]rails-v在您的Gemfile中列出的或native可用的任何gem源中找不到gem'pg(>=0)ruby​​'。运行bundleinstall以安装缺少的gem。bundleinstallFetchinggemmetadatafromhttps://rubygems.org/............Fetchingversionmetadatafromhttps://rubygems.org/...Fe

  7. ruby - #之间? Cooper 的 *Beginning Ruby* 中的错误或异常 - 2

    在Cooper的书BeginningRuby中,第166页有一个我无法重现的示例。classSongincludeComparableattr_accessor:lengthdef(other)@lengthother.lengthenddefinitialize(song_name,length)@song_name=song_name@length=lengthendenda=Song.new('Rockaroundtheclock',143)b=Song.new('BohemianRhapsody',544)c=Song.new('MinuteWaltz',60)a.betwee

  8. ruby-on-rails - 每次我尝试部署时,我都会得到 - (gcloud.preview.app.deploy) 错误响应 : [4] DEADLINE_EXCEEDED - 2

    我是Google云的新手,我正在尝试对其进行首次部署。我的第一个部署是RubyonRails项目。我基本上是在关注thisguideinthegoogleclouddocumentation.唯一的区别是我使用的是我自己的项目,而不是他们提供的“helloworld”项目。这是我的app.yaml文件runtime:customvm:trueentrypoint:bundleexecrackup-p8080-Eproductionconfig.ruresources:cpu:0.5memory_gb:1.3disk_size_gb:10当我转到我的项目目录并运行gcloudprevie

  9. ruby-on-rails - Rails 5 Active Record 记录无效错误 - 2

    我有两个Rails模型,即Invoice和Invoice_details。一个Invoice_details属于Invoice,一个Invoice有多个Invoice_details。我无法使用accepts_nested_attributes_forinInvoice通过Invoice模型保存Invoice_details。我收到以下错误:(0.2ms)BEGIN(0.2ms)ROLLBACKCompleted422UnprocessableEntityin25ms(ActiveRecord:4.0ms)ActiveRecord::RecordInvalid(Validationfa

  10. arrays - 这是 Ruby 中 Array.fill 方法的错误吗? - 2

    这个问题在这里已经有了答案:Arraysmisbehaving(1个回答)关闭6年前。是否应该这样,即我误解了,还是错误?a=Array.new(3,Array.new(3))a[1].fill('g')=>[["g","g","g"],["g","g","g"],["g","g","g"]]它不应该导致:=>[[nil,nil,nil],["g","g","g"],[nil,nil,nil]]

随机推荐