草庐IT

0007 - MapReduce入门指南

小左先森 2023-03-28 原文
大数据梦工厂(0007 - MapReduce入门指南)


1 - MapReduce简介

Hadoop MapReduce 是一个分布式计算框架(也称为编程模型)。基于它编写的应用程序能够以一种可靠、容错的方式在大规模集群(数千个节点)上并行处理 TB 级别的海量数据集。

MapReduceTask 过程分为两个处理阶段:Map 阶段和 Reduce 阶段。每个阶段都是以 <key, value> 键值对作为输入和输出,也可以自定义编写 map() 函数和 reduce() 函数。

网上一个比较形象的例子解释 MapReduce:

我们要数图书馆中的所有书。 你数 1 号书架,我数 2 号书架。这就是 “Map”。我们人越多,数书就越快。 现在我们到一起,把所有人的统计数加在一起。这就是 “Reduce”。

2 - MapReduce编程模型

MapReduce 作业的输入和输出类型:

(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output) 以词频统计为例进行说明,MapReduce 处理的数据流程如下:

mr_process.png

MapReduce 的工作流程包括 5 个步骤:

  1. Input - 读取文本文件;
  2. Splitting - 将文本按行拆分(可按空格、逗号、分号、行、换行符 ('\n') 等拆分),得到 K1 表示行数,V1 表示对应行的文本内容;
  3. Mapping - 将每一行按空格并行拆分处理,排序得到 List(K2,V2),其中 K2 表示每一个单词,由于是做词频统计,所以 V2 的值为 1,代表出现 1 次;
  4. Shuffling - 由于 Mapping 操作可能是在不同的机器上并行处理,所以需要通过 Shuffling 将相同 key 值的数据分发到同一个节点上去合并,然后排序,此时得到 K2 为每一个单词,List(V2) 为可迭代集合,V2 就是 Mapping 中的 V2
  5. Reducing - 该模型是统计每个单词出现的总次数,所以 ReducingList(V2) 进行归约求和操作,将这些数据元组组合成一个更小的元组集;
  6. Final Result - 最终输出文件保存在文件系统(如:HDFS)上。

3 - MapReduce2工作机制

图片来自《Hadoop: The Definitive Guide》

1、run job 通过 Job 对象的 submit() 或者 waitForCompletion() 方法创建一个内部的 JobSummiter 实例,提交作业。waitForCompletion() 方法通过每秒循环轮询作业进度,如果发现与上次报告有改变,则把进度报告到控制台。

  • Job.submit() - 将作业提交到集群并立即返回。
  • Job.waitForCompletion() - 将作业提交到集群并等待它完成。
2、get new application 向 ResourceManager 申请全局唯一 Application ID,RM 检查作业的输出说明、计算作业的输入分片。

3、copy job resources 将运行作业所需的资源(包括作业 JAR 文件、配置文件和输入分片)复制到一个以作业 ID 命名的 HDFS 共享目录中。

4、submit application 通过 submitApplication() 方法提交作业到 ResourceManager。

5a、start container ResourceManager 收到 submitApplication() 消息后,便将请求传递给 YARN 调度器(Scheduler)。

5b、launch 调度器为其分配一个容器(Container),RM 在 NodeManager 的管理下在容器中启动 MRAppMaster 进程。

6、initialize job ApplicationMaster 是一个 Java 应用程序,主类是 MRAppMaster,对作业进行初始化,创建多个薄记对象以跟踪作业进度。

7、retrieve input splits ApplicationMaster 接受来自 HDFS 共享目录的、客户端计算的输入分片。对每一个分片创建一个 MapTask 对象以及由 mapreduce.job.reduces 参数设置 reduce 个数。任务 ID 在此时分配。

如果作业很小就会启动一个 JVM 运行 MapReduce 作业,称为 uberized 或者作为 uber 任务运行。通过设置 mapreduce.job.ubertask.enable 为 true 使用。

哪些是小作业? 当小于 10 个 mapper 且只有 1 个 reducer 且输入大小小于一个 HDFS 块的作业。

8、allocate resources 如果作业不适合作为 uber 任务运行, ApplicationMaster 就会为该作业中的所有 MapTask 和 ReduceTask 向 RM 请求容器。每个 mapper 和 reducer 都默认分配到 1024 MB 内存和 1 个虚拟内核。但 MapTask 完成 5% 时(mapreduce.job.reduce.slowstart.completedmaps,默认 0.05),ReduceTask 才会开始运行,以减少 ReduceTask 的等待时间。

分别通过以下 4 个参数来设置

mapreduce.map.memory.mb mapreduce.reduce.memory.mb mapreduce.map.cpu.vcores mapreduce.reduce.cpu.vcores 9、start container 一旦 RM 的调度器为任务分配了容器,ApplicationMaster 就与 NM 通信启动容器。

9b、launch 该任务有主类 该任务由主类 YarnChild 的 Java 应用程序执行。

10、retrieve job resources 在运行任务之前,首先将任务需要的资源本地化,包括作业配置、JAR 文件和所有来自 HDFS 共享目录的文件。

11、run 运行 MapTask 或 ReduceTask。

12、进度和状态的更新 MapReduce 作业是长时间(从数秒到数小时)运行的批量作业。因此,能够得知关于作业进展的一些反馈很重要。例如,作业或任务的状态(运行中、成功完成、失败)、MapTask 和 ReduceTask 的进度、作业计数器的值、状态消息或描述等实时信息。

在作业运行期间,客户端每秒轮询一次 ApplicationMaster 以接收最新状态(轮询间隔通过 mapreduce.client.progressmonitor.pollinterval 设置,以毫秒为单位)。客户端也可以使用 Job 的 getStatus() 方法得到一个 JobStatus 的实例,包含作业的所有状态信息。如下图所示:

图片来自《Hadoop: The Definitive Guide》

4 - MapReduce数据流

4.1 - 一个 Reduce 任务

一个 Reduce 任务的 MapReduce 数据流,通常是所有的 Map 任务的输出。

图片来自《Hadoop: The Definitive Guide》

4.2 - 多个 Reduce 任务

多个 Reduce 任务的 MapReduce 数据流,每个 Reduce 任务的输入都是来自多个 Map 任务(Map 任务和 Reduce 任务之间的数据流称之为 Shuffle)。

图片来自《Hadoop: The Definitive Guide》

4.3 - 无 Reduce 任务

无 Reduce 任务的 MapReduce 数据流,数据处理可以完全并行(即无需 Shuffle),唯一的非本地节点数据传输是 Map 任务将结果写入到 HDFS。

图片来自《Hadoop: The Definitive Guide》

5 - Map & Reduce

5.1 - Map

1、Map 概述

MapReduce 框架根据作业的 InputFormat 来:

  • 检查作业输入的有效性(基于行的日志文件、二进制格式文件、数据库表等);
  • 把输入文件切分成多个 InputSplit 实例,切片大小,默认等于 Block 大小(128MB);
  • 通过 RecordReader 读取 InputSplit 转换为标准的 <key, value> 键值对,作为 Map 输出,直到读取完成;
  • 一旦读取完成,这些键值对被发送到 Mapper 处理,并提供输出 <key, value> 键值对;
  • Mapper 的输出称为中间输出,把输出作业结果写到文件系统(如:HDFS)上。
2、需要多少个 Map? Map 的数量通常是由输入数据的总大小决定,即输入文件的总块(Block)数

Map 正常的并行规模大致是每个节点(Node)大约 10-100 个 Map,对于 CPU 消耗较小的 MapTask 可以设到 300 个左右。由于每个任务初始化需要一定的时间,因此,Map 执行的时间至少超过 1分钟。

InputFormat 决定 Map 的数量:

Map = {(总数据大小)/(每个块(block)的大小))} 例如:如果数据总大小是 1TB,并且 InputSplit 大小是 128MB,那么:

Map = (1*1024*1024)/128 = 8192

5.2 - Reduce

1、Reduce 概述

MapReduce 框架根据作业的 OutputFormat 来:

  • 检验作业的输出,例如:检查输出路径是否已经存在。
  • 通过 RecordWriter 把输出作业结果写到文件系统(如:HDFS)上。
Reduce 有三个阶段:

  • Shuffle - Reducer 的输入就是 Mapper 已经排好序的输出。
  • Sort - 按照 key 值对 Reducer 的输入进行分组。Shuffle 和 Sort 是同时进行的。
  • Reduce - 将键值对组合起来,并根据实现的业务逻辑输出结果写到文件系统(如:HDFS)上,且没有排序。
2、需要多少个 Reduce? Reduce 的数量计算如下:

Reduce = <0.95 或 1.75> * (<节点数> * <每个节点的最大容器数>)
  • 使用 0.95,所有 ReduceTask 会在 MapTask 完成时立刻启动,开始传输 Map 的输出结果。
  • 使用 1.75,第一轮 ReduceTask 由速度更快的节点完成;第二轮 ReduceTask 启动,这样可以得到比较好的负载均衡的效果。
增加 Reduce 的数量会增加整个 MapReduce 框架的开销,但可以改善负载均衡,降低由于执行失败带来的负面影响。

上述比例因子略小于整数,是为了给 MapReduce 框架中的推测性任务(speculative-tasks) 或失败任务预留一些 Reduce 的资源。

6 - Combiner & Partitioner

6.1 - Combiner

1、Combiner 概述 Combiner 在 Map 阶段,对每一个 MapTask 所在的节点运行(局部汇总),是一个可选的操作,将同一个 key 值的中间结果合并。这里以词频统计为例:

aa bb cc dd ee aa ff bb cc dd ee ff 输入文件中共有 12 个 keys,Mapper 在遇到一个 aa 的 key 时就会记录为 1,但是这文件里 aa 可能会出现 n 次,那么 Mapper 输出文件冗余就会很多,因此在 Reduce 计算前对相同的 key 做一个合并操作,那么需要传输的数据量就会减少,传输效率就可以得到提升。

但并非所有场景都适合使用 Combiner,使用它的原则是 Combiner 的输出不会影响到 Reduce 计算的最终输入,例如:求总数,最大值,最小值时都可以使用 Combiner,但是做平均值计算则不能使用 Combiner。

因此,Combiner 的用途如下:

  • 最大限度的减少 MapTask 和 ReduceTask 之间数据传输的时间;
  • 减小 MapTask 和 ReduceTask 之间的数据传输量,以减轻 Shuffle 过程中网络带宽占用。
2、使用 Combiner 词频统计示例 想要使用 Combiner 功能只要在组装作业时,添加下面一行代码即可:

// 设置 Combiner job.setCombinerClass(IntSumReducer.class); 输入文件中共有 12 个 keys。输入通过 Mapper 处理,相同的 12 个 keys 键值对作为输入发送到 Reducer。

① 不使用 Combiner 的情况:

图片来源:TutorialsCampus

② 使用 Combiner 的情况:

图片来源:TutorialsCampus

可以看到,使用 Combiner 的时候,需要传输到 Reducer 中的数据由 12 Keys,降低到 10 Keys,降低的幅度取决于 Keys 的重复率。利用 Combiner 来减少通过 Shuffle 传输的数据量。

6.2 - Partitioner

Partitioner (分区)用于划分 key 值空间(key space),也可以理解成分类器。

默认 Partitioner 是根据 Map 的输出结果按照 key 值的不同分别给对应的 Reduce。支持自定义实现。例如:将统计结果按照手机归属地,不同的省份输出到不同的文件中。


::: hljs-center

扫一扫,我们的故事就开始了。

:::

有关0007 - MapReduce入门指南的更多相关文章

  1. LC滤波器设计学习笔记(一)滤波电路入门 - 2

    目录前言滤波电路科普主要分类实际情况单位的概念常用评价参数函数型滤波器简单分析滤波电路构成低通滤波器RC低通滤波器RL低通滤波器高通滤波器RC高通滤波器RL高通滤波器部分摘自《LC滤波器设计与制作》,侵权删。前言最近需要学习放大电路和滤波电路,但是由于只在之前做音乐频谱分析仪的时候简单了解过一点点运放,所以也是相当从零开始学习了。滤波电路科普主要分类滤波器:主要是从不同频率的成分中提取出特定频率的信号。有源滤波器:由RC元件与运算放大器组成的滤波器。可滤除某一次或多次谐波,最普通易于采用的无源滤波器结构是将电感与电容串联,可对主要次谐波(3、5、7)构成低阻抗旁路。无源滤波器:无源滤波器,又称

  2. 微信小程序开发入门与实战(Behaviors使用) - 2

    @作者:SYFStrive @博客首页:HomePage📜:微信小程序📌:个人社区(欢迎大佬们加入)👉:社区链接🔗📌:觉得文章不错可以点点关注👉:专栏连接🔗💃:感谢支持,学累了可以先看小段由小胖给大家带来的街舞👉微信小程序(🔥)目录自定义组件-behaviors    1、什么是behaviors    2、behaviors的工作方式    3、创建behavior    4、导入并使用behavior    5、behavior中所有可用的节点    6、同名字段的覆盖和组合规则总结最后自定义组件-behaviors    1、什么是behaviorsbehaviors是小程序中,用于实现

  3. 【Java入门】使用Java实现文件夹的遍历 - 2

    遍历文件夹我们通常是使用递归进行操作,这种方式比较简单,也比较容易理解。本文为大家介绍另一种不使用递归的方式,由于没有使用递归,只用到了循环和集合,所以效率更高一些!一、使用递归遍历文件夹整体思路1、使用File封装初始目录,2、打印这个目录3、获取这个目录下所有的子文件和子目录的数组。4、遍历这个数组,取出每个File对象4-1、如果File是否是一个文件,打印4-2、否则就是一个目录,递归调用代码实现publicclassSearchFile{publicstaticvoidmain(String[]args){//初始目录Filedir=newFile("d:/Dev");Datebeg

  4. ES基础入门 - 2

    ES一、简介1、ElasticStackES技术栈:ElasticSearch:存数据+搜索;QL;Kibana:Web可视化平台,分析。LogStash:日志收集,Log4j:产生日志;log.info(xxx)。。。。使用场景:metrics:指标监控…2、基本概念Index(索引)动词:保存(插入)名词:类似MySQL数据库,给数据Type(类型)已废弃,以前类似MySQL的表现在用索引对数据分类Document(文档)真正要保存的一个JSON数据{name:"tcx"}二、入门实战{"name":"DESKTOP-1TSVGKG","cluster_name":"elasticsear

  5. Ruby 和指南针路径与 yeoman 项目 - 2

    我安装了ruby​​、yeoman,当我运行我的项目时,出现了这个错误:Warning:Running"compass:dist"(compass)taskWarning:YouneedtohaveRubyandCompassinstalledthistasktowork.Moreinfo:https://github.com/gruUse--forcetocontinue.Use--forcetocontinue.我有进入可变session目标的路径,但它不起作用。谁能帮帮我? 最佳答案 我必须运行这个:geminstallcom

  6. 区块链入门教程(6)--WeBASE-Front节点前置服务安装 - 2

    文章目录1.任务背景2.任务目标3.相关知识点4.任务实操4.1安装配置JDK4.2启动FISCOBCOS4.3下载解压WeBASE-Front4.4拷贝sdk证书文件4.5启动节点4.6访问节点4.7检查运行状态5.任务总结1.任务背景FISCOBCOS其实是有控制台管理工具,用来对区块链系统进行各种管理操作。但是对于初学者来说,还是可视化界面更友好,本节就来介绍WeBASE管理平台,这是一款微众银行开源的自研区块链中间件平台,可以降低区块链使用的门槛,大幅提高区块链应用的开发效率。微众银行是腾讯牵头设立的民营银行,在国内民营银行里还是比较出名的。微众银行参与FISCOBCOS生态建设,一定

  7. Tcl脚本入门笔记详解(一) - 2

    TCL脚本语言简介•TCL(ToolCommandLanguage)是一种解释执行的脚本语言(ScriptingLanguage),它提供了通用的编程能力:支持变量、过程和控制结构;同时TCL还拥有一个功能强大的固有的核心命令集。TCL经常被用于快速原型开发,脚本编程,GUI和测试等方面。•实际上包含了两个部分:一个语言和一个库。首先,Tcl是一种简单的脚本语言,主要使用于发布命令给一些互交程序如文本编辑器、调试器和shell。由于TCL的解释器是用C\C++语言的过程库实现的,因此在某种意义上我们又可以把TCL看作C库,这个库中有丰富的用于扩展TCL命令的C\C++过程和函数,所以,Tcl是

  8. Simulink方法总结和避坑指南(一)——Simulink入门与基本调试方法 - 2

    文章目录一、项目场景二、基本模块原理与调试方法分析——信源部分:三、信号处理部分和显示部分:四、基本的通信链路搭建:四、特殊模块:interpretedMATLABfunction:五、总结和坑点提醒一、项目场景  最近一个任务是使用simulink搭建一个MIMO串扰消除的链路,并用实际收到的数据进行测试,在搭建的过程中也遇到了不少的问题(当然这比vivado里面的debug好不知道多少倍)。准备趁着这个机会,先以一个很基本的通信链路对simulink基础和相关的debug方法进行总结。  在本篇中,主要记录simulink的基本原理和基本的SISO通信传输链路(QPSK方式),计划在下篇记

  9. ruby - Ruby gems 的问题(损坏?)试图让指南针在 npm 中工作 - 2

    我不是Ruby专家,但想弄清楚发生了什么,因为我试图让指南针在节点应用程序中工作,但我的Ruby似乎坏了。打字:ruby--version让我:ruby2.1.1p76(2014-02-24revision45161)[x86_64-darwin13.0]我安装了Homebrew,之前遇到过Ruby版本的问题,但它似乎已安装并且可以正常工作。但是,当我使用gem输入请求时,出现此错误:$gem-hErrorloadingRubyGemsplugin"/Users/user_dir/.rvm/gems/ruby-2.1.1@global/gems/executable-hooks-1.3

  10. ESP32学习入门:WiFi连接网络 - 2

    目录一、ESP32简单介绍二、ESP32Wi-Fi模块介绍三、ESP32Wi-Fi编程模型四、ESP32Wi-Fi事件处理流程 五、ESP32Wi-Fi开发环境六、ESP32Wi-Fi具体代码七、ESP32Wi-Fi代码解读6.1主程序app_main7.2自定义代码wifi_init_sta()八、ESP32Wi-Fi连接验证8.1测试方法8.2服务器模拟工具sscom58.3测试代码8.4测试结果前言为了开发一款亚马逊物联网产品,开始入手ESP32模块。为了能够记录自己的学习过程,特记录如下操作过程。一、ESP32简单介绍ESP32是一套Wi-Fi(2.4GHz)和蓝牙(4.2)双模解决方

随机推荐