草庐IT

Spark源码解析(一):RDD之Transfrom算子

打卡从这里开始 2023-03-31 原文

一、延迟计算

RDD 代表的是分布式数据形态,因此,RDD 到 RDD 之间的转换,本质上是数据形态上的转换(Transformations)

在 RDD 的编程模型中,一共有两种算子,Transformations 类算子和 Actions 类算子。开发者需要使用 Transformations 类算子,定义并描述数据形态的转换过程,然后调用 Actions 类算子,将计算结果收集起来、或是物化到磁盘。

在这样的编程模型下,Spark 在运行时的计算被划分为两个环节。

  1. 基于不同数据形态之间的转换,构建计算流图(DAG,Directed Acyclic Graph)
  2. 通过 Actions 类算子,以回溯的方式去触发执行这个计算流图

换句话说,开发者调用的各类 Transformations 算子,并不立即执行计算,当且仅当开发者调用 Actions 算子时,之前调用的转换算子才会付诸执行。在业内,这样的计算模式有个专门的术语,叫作“延迟计算”(Lazy Evaluation)。

二、Spark算子分类

在 RDD 的开发框架下,哪些算子属于 Transformations 算子,哪些算子是 Actions 算子呢?

这里给出一张自己在极客看的课程中的图

三、Transform算子执行流程(源码)

Map转换算是 RDD 的经典转换操作之一了.就以它开头.Map的源码如下:

1. sc.clean(f)

首先掉了一个sc.clean(f) , 我们进到clean函数里看下:

注释中明确提到了这个函数的功能:clean 整理一个闭包,使其可以序列化并发送到任务.

这里的代码有些多,大概知道这个函数的功能是这样就ok了,闭包的问题会在另一篇文章里仔细介绍

2. MapPartitionsRDD

进入到函数后源码如下:

这是一个MapPartitionsRDD。我们仔细看它的构成,从而来理解它是如何描述MapPartitionsRDD的.

2.1 var prev:RDD[T]

这里的 prev 就是父RDD,f 则是Map中传入的处理函数,除了这两个就没有了,也就是说明 RDD中没有存储具体的数据本身

这再次印证了转换不会产生任何数据.它只是单纯了记录父RDD以及如何转换的过程就完了,不会在转换阶段产生任何数据集

2.2 preservesPartitioning

preservesPartitioning 表示是否保持父RDD的分区信息.
如果为false(默认为false),则会对结果重新分区.也就是Map系默认都会分区
如果为true,保留分区. 则按照 firstParent 保留分区   

可以看到根据 dependencies 找到其第一个父 RDD

2.3 compute 计算逻辑
2.3.1 compute方法

RDD 抽象类要求其所有子类都必须实现 compute 方法,该方法接受的参数之一是一个Partition 对象,目的是计算该分区中的数据。

override def compute(split: Partition, context: TaskContext): Iterator[U] =
  f(context, split.index, firstParent[T].iterator(split, context))

可以看到,compute 方法调用当前 RDD 内的第一个父 RDD 的 iterator 方法,该方的目的是拉取父 RDD 对应分区内的数据。

iterator 方法会返回一个迭代器对象,迭代器内部存储的每个元素即父 RDD 对应分区内已经计算完毕的数据记录。得到的迭代器作为 f 方法的一个参数。fRDD 类的 map 方法中指定,即实际的转换函数。

compute 方法会将迭代器中的记录一一输入 f 方法,得到的新迭代器即为所求分区中的数据。

其他 RDD 子类的 compute 方法与之类似,在需要用到父 RDD 的分区数据时候,就会调用 iterator 方法,然后根据需求在得到的数据之上执行粗粒度的操作。换句话说,compute 函数负责的是父 RDD 分区数据到子 RDD 分区数据的变换逻辑。

2.3.2 iterator方法

此方法的实现在 RDD 这个抽象类中

/**
 * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
 * This should ''not'' be called by users directly, but is available for implementers of custom
 * subclasses of RDD.
 */
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {
    getOrCompute(split, context)
  } else {
    computeOrReadCheckpoint(split, context)
  }
}

interator首先检查 存储级别 storageLevel:此处可参考RDD持久化

如果存储级别不是NONE, 说明分区的数据说明分区的数据要么已经存储在文件系统当中,要么当前 RDD 曾经执行过 cachepersise 等持久化操作,此时需要从存储空间读取分区数据,调用 getOrCompute 方法

getOrCompute 方法会根据 RDD 编号:id分区编号:partition.index 计算得到当前分区在存储层对应的块编号:blockId,通过存储层提供的数据读取接口提取出块的数据。

代码中的这几句注释给的非常到位,大致的判断顺序如下:

  • 块命中的情况:也就是数据之前已经成功存储到介质中,这其中可能是数据本身就在存储介质中(比如通过读取HDFS创建的RDD),也可能是 RDD 在经过持久化操作并且经历了一次计算过程,这个时候我们就能成功读取数据并将其返回
  • 块未命中的情况:可能是数据已经丢失,或者 RDD 经过持久化操作,但是是当前分区数据是第一次被计算,因此会出现拉取得到数据为 None 的情况。这就意味着我们需要计算分区数据,继续调用 RDDcomputeOrReadCheckpoint 方法来计算数据,并将计算得到的数据缓存到存储介质中,下次就无需再重复计算。

如果当前RDD的存储级别为 None,说明为未经持久化的 RDD,需要重新计算 RDD 内的数据,这时候调用 RDD 类的 computeOrReadCheckpoint 方法,该方法也在持久化 RDD 的分区获取数据失败时被调用。

computeOrReadCheckpoint 方法会检查当前 RDD 是否已经被标记成检查点,如果未被标记成检查点,则执行自身的 compute 方法来计算分区数据,否则就直接拉取父 RDD 分区内的数据。

需要注意的是,对于标记成检查点的情况,当前 RDD 的父 RDD 不再是原先转换操作中提供数据的父 RDD,而是被 Apache Spark 替换成一个 CheckpointRDD 对象,该对象中的数据存放在文件系统中,因此最终该对象会从文件系统中读取数据并返回给 computeOrReadCheckpoint 方法

参考文章:

Cache 和 Checkpoint

有关Spark源码解析(一):RDD之Transfrom算子的更多相关文章

  1. Ruby 解析字符串 - 2

    我有一个字符串input="maybe(thisis|thatwas)some((nice|ugly)(day|night)|(strange(weather|time)))"Ruby中解析该字符串的最佳方法是什么?我的意思是脚本应该能够像这样构建句子:maybethisissomeuglynightmaybethatwassomenicenightmaybethiswassomestrangetime等等,你明白了......我应该一个字符一个字符地读取字符串并构建一个带有堆栈的状态机来存储括号值以供以后计算,还是有更好的方法?也许为此目的准备了一个开箱即用的库?

  2. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  3. ruby - 用逗号、双引号和编码解析 csv - 2

    我正在使用ruby​​1.9解析以下带有MacRoman字符的csv文件#encoding:ISO-8859-1#csv_parse.csvName,main-dialogue"Marceu","Giveittohimóhe,hiswife."我做了以下解析。require'csv'input_string=File.read("../csv_parse.rb").force_encoding("ISO-8859-1").encode("UTF-8")#=>"Name,main-dialogue\r\n\"Marceu\",\"Giveittohim\x97he,hiswife.\"\

  4. ruby-on-rails - 我更新了 ruby​​ gems,现在到处都收到解析树错误和弃用警告! - 2

    简而言之错误:NOTE:Gem::SourceIndex#add_specisdeprecated,useSpecification.add_spec.Itwillberemovedonorafter2011-11-01.Gem::SourceIndex#add_speccalledfrom/opt/local/lib/ruby/site_ruby/1.8/rubygems/source_index.rb:91./opt/local/lib/ruby/gems/1.8/gems/rails-2.3.8/lib/rails/gem_dependency.rb:275:in`==':und

  5. UE4 源码阅读:从引擎启动到Receive Begin Play - 2

    一、引擎主循环UE版本:4.27一、引擎主循环的位置:Launch.cpp:GuardedMain函数二、、GuardedMain函数执行逻辑:1、EnginePreInit:加载大多数模块int32ErrorLevel=EnginePreInit(CmdLine);PreInit模块加载顺序:模块加载过程:(1)注册模块中定义的UObject,同时为每个类构造一个类默认对象(CDO,记录类的默认状态,作为模板用于子类实例创建)(2)调用模块的StartUpModule方法2、FEngineLoop::Init()1、检查Engine的配置文件找出使用了哪一个GameEngine类(UGame

  6. ruby - 用 YAML.load 解析 json 安全吗? - 2

    我正在使用ruby2.1.0我有一个json文件。例如:test.json{"item":[{"apple":1},{"banana":2}]}用YAML.load加载这个文件安全吗?YAML.load(File.read('test.json'))我正在尝试加载一个json或yaml格式的文件。 最佳答案 YAML可以加载JSONYAML.load('{"something":"test","other":4}')=>{"something"=>"test","other"=>4}JSON将无法加载YAML。JSON.load("

  7. ruby - 如何使用 Nokogiri 解析纯 HTML 表格? - 2

    我想用Nokogiri解析HTML页面。页面的一部分有一个表,它没有使用任何特定的ID。是否可以提取如下内容:Today,3,455,34Today,1,1300,3664Today,10,100000,3444,Yesterday,3454,5656,3Yesterday,3545,1000,10Yesterday,3411,36223,15来自这个HTML:TodayYesterdayQntySizeLengthLengthSizeQnty345534345456563113003664354510001010100000344434113622315

  8. python - 帮我找到合适的 ruby​​/python 解析器生成器 - 2

    我使用的第一个解析器生成器是Parse::RecDescent,它的指南/教程很棒,但它最有用的功能是它的调试工具,特别是tracing功能(通过将$RD_TRACE设置为1来激活)。我正在寻找可以帮助您调试其规则的解析器生成器。问题是,它必须用python或ruby​​编写,并且具有详细模式/跟踪模式或非常有用的调试技术。有人知道这样的解析器生成器吗?编辑:当我说调试时,我并不是指调试python或ruby​​。我指的是调试解析器生成器,查看它在每一步都在做什么,查看它正在读取的每个字符,它试图匹配的规则。希望你明白这一点。赏金编辑:要赢得赏金,请展示一个解析器生成器框架,并说明它的

  9. ruby - 如何用 Nokogiri 解析连续的标签? - 2

    我有这样的HTML代码:Label1Value1Label2Value2...我的代码不起作用。doc.css("first").eachdo|item|label=item.css("dt")value=item.css("dd")end显示所有首先标记,然后标记标签,我需要“标签:值” 最佳答案 首先,您的HTML应该有和中的元素:Label1Value1Label2Value2...但这不会改变您解析它的方式。你想找到s并遍历它们,然后在每个你可以使用next_element得到;像这样:doc=Nokogiri::HTML(

  10. ruby-on-rails - 如何在 Rails 3 中禁用 XML 解析 - 2

    我想禁用HTTP参数的自动XML解析。但我发现命令仅适用于Rails2.x,它们都不适用于3.0:config.action_controller.param_parsers.deleteMime::XML(application.rb)ActionController::Base.param_parsers.deleteMime::XMLRails3.0中的等价物是什么? 最佳答案 根据CVE-2013-0156的最新安全公告你可以将它用于Rails3.0。3.1和3.2ActionDispatch::ParamsParser::

随机推荐