草庐IT

一文学完Spark常用算子(Spark算子大全)

笑看风云路 2023-04-20 原文

目录

前言

Spark RDD的算子分为转换算子(Transformation)和行动算子(Action)。

转换算子

转换算子分为:Value类型、双Value类型和K-V类型。

一、Value类型

1. map

将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。

例子:

val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(
    List(1, 2, 3, 4)
)
val mapRDD = rdd.map(_*2)
mapRDD.collect().foreach(println)
sc.stop()

2. mapPartitions

以分区为单位对数据进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。

例子:

val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(
    List(1, 2, 3, 4), 2
)
val mapRDD = rdd.mapPartitions(datas => datas.map(_*2))
mapRDD.collect().foreach(println)
sc.stop()

注意:

1、会将整个分区的数据加载到内存,如果处理完不被释放,在内存较小并且数据量较大的情况下,容易出现内存溢出(OOM)

2、可以实现一些特殊功能,比如取每个分区中最大值,map无法实现

3. mapPartitionsWithIndex

类似于mapPartitions,比mapPartitions多一个参数来表示分区号

例子:

val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(
    List(1, 2, 3, 4), 2
)
val mapRDD = rdd.mapPartitionsWithIndex((index, datas) =>{
      index match {
        case 1 => datas.map(_ * 2)
        case _ => datas
      }
    })
mapRDD.collect().foreach(println)
sc.stop()

4. flatMap

将处理的数据进行扁平化后再进行映射处理,所以算子也称为扁平映射。返回一个可迭代的集合。

例子:

val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(
    List(List(1, 2), List(3, 4))
)
val fmRDD = rdd.flatMap(
    list => {
        list
    }
)
fmRDD.collect().foreach(println)
sc.stop()

当集合中的数据类型不同时,可以使用match case进行模式匹配,转换成集合类型。

例子:

val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(
    List(List(1, 2),3, List(3, 4))
)
val fmRDD = rdd.flatMap {
    case list: List[_] => list
    case d => List(d)
}
fmRDD.collect().foreach(println)
sc.stop()

5. glom

将RDD中每一个分区变成一个数组,数组中元素类型与原分区中元素类型一致。

例子:

val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(
    List(1, 2, 3, 4),2
)
val gRDD = rdd.glom()
gRDD.collect().foreach(data => println(data.mkString(",")))
sc.stop()

6. groupBy

根据指定的规则进行分组,分区默认不变,数据会被打乱(shuffle)。极限情况下,数据可能会被分到同一个分区中。

一个分区可以有多个组,一个组只能在一个分区中。

例子:

val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(
    List(1, 2, 3, 4),2
)
// groupBy 会将数据源中的每一个数据进行分组判断,根据返回的分组key进行分组,相同的key值的数据会被放置在一个组中
def groupFunction(num:Int):Int = {
    num % 2
}
val groupRDD = rdd.groupBy(groupFunction)
groupRDD.collect().foreach(println)
sc.stop()

7. filter

根据指定的规则进行筛选过滤,符合规则的数据保留,不符合的丢弃。

当数据进行筛选过滤后,分区不变,但是分区内数据可能不均衡,导致数据倾斜。

例子:

val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(
    List(1, 2, 3, 4),2
)
val filterRDD = rdd.filter(_ % 2 == 1)
filterRDD.collect().foreach(println)
sc.stop()

8. sample

根据指定规则从数据集中采样数据。通过它可以找到数据倾斜的key。

val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(
    List(1, 2, 3, 4),2
)
println(rdd.sample(
    true,
    2
    //      1
).collect().mkString((",")))
sc.stop()

9. distinct

将数据集中的数据去重。使用分布式处理方式实现,与内存集合使用HashSet去重方式不同。

例子:

val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(
    List(1, 2, 3, 4, 2, 3, 1),2
)
val distinctRDD = rdd.distinct()
distinctRDD.collect().foreach(println)
// 内存集合distinct的去重方式使用 HashSet 去重
// List(1,1,2,2).distinct
sc.stop()

10. coalesce

根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率。

当Spark程序中存在过多的小任务时,可以通过coalesce合并分区,减少分区个数,进而减少任务调度成本。

例子:

val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(
    List(1, 2, 3, 4, 5, 6),3
)
// 无shuffle
val coaRDD = rdd.coalesce(2)
// 有shuffle
// val coaRDD = rdd.coalesce(2, true)
coaRDD.saveAsTextFile("output1")
sc.stop()

注意:

1、coalesce默认不会将分区数据打乱重新组合,这种情况会导致数据不均衡,出现数据倾斜

2、可以设置第二个参数为true,进行shuffle处理,让数据均衡

3、扩大分区时,可以使用coalesce(,true)或者repartition

11. sortBy

根据指定规则进行排序,默认升序,设置第二个参数改变排序方式。

默认情况下,不会改变分区个数,但是中间存在shuffle处理。

val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(
    List(4, 5, 1, 3, 2, 6),2
)
val sortRDD = rdd.sortBy(num => num)
sortRDD.saveAsTextFile("output")
sc.stop()

二、双Value类型

1. intersection

两个RDD求交集

2. union

两个RDD求并集

3. subtract

两个RDD求差集

4. zip

拉链操作,以键值对的形式进行合并。

例子:

val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd1 = sc.makeRDD(List(1,2,3,4))
val rdd2 = sc.makeRDD(List(3,4,5,6))

// 要求两个数据源数据类型保持一致
// 求交集
//    val newRDD = rdd1.intersection(rdd2)
// 求并集 : 只是合并不去重,要想去重可以使用 distinct 算子进行去重
//    val newRDD = rdd1.union(rdd2)
// 求差集
//    val newRDD = rdd1.subtract(rdd2)
// 拉链, 对应位置一对一映射,组成(key,value),需要每个对应分区上的数据个数相同
val newRDD = rdd1.zip(rdd2)
println(newRDD.collect().mkString(","))
sc.stop()

注意:

1、intersection,union和subtract要求两个RDD中的数据类型保持一致

2、zip:不要求两个RDD中的数据类型保持一致,但要求分区个数以及对应分区上的数据个数保持一致

三、K-V类型

1. partitionBy

将数据按照指定artitioner重新进行分区,默认的分区器是HashPartitioner。

例子:

val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(List(1,2,3,4),2)
val newRDD:RDD[(Int, Int)] = rdd.map((_, 1))
// partitionBy 根据指定的分区规则对数据进行重分区
newRDD.partitionBy(new HashPartitioner(2))
.saveAsTextFile("output")
sc.stop()

2. reduceByKey

将数据按照相同的key对value进行聚合。

例子:

val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("b",2)))
// reduceByKey 相同的Key的数据进行value数据的聚合操作
// scala 语言中一般的聚合都是两两聚合,spark基于scala开发的,所以它的聚合也是两两聚合的
// reduceByKey 中如果key的数据只有一个,是不会参与运算的。
val reduceRDD = rdd.reduceByKey(_ + _)
reduceRDD.collect().foreach(println)
sc.stop()

3. groupByKey

将数据按照相同的key对value进行分组,形成一个对偶元祖。

val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("b",2)))
// groupByKey : 将数据源中的数据,相同的key的数据分到一个组中,形成一个对偶元组
// 元组中的第一个元素就是key,第二个元素就是相同key的value集合
val groupRDD = rdd.groupByKey()
groupRDD.collect().foreach(println)
sc.stop()

4. aggregateByKey

根据不同的规则进行分区内计算和分区间计算。

例子:

val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("a",4)),2)
// 分区内和分区间的计算规则可以不同,也可以相同
rdd.aggregateByKey(0)(
    (x,y) => math.max(x,y),
    (x,y) => x + y
).collect().foreach(println)
sc.stop()

5. foldByKey

aggregateByKey的简化操作,分区内和分区间的计算规则一样

例子:

val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("a",4)),2)

// 分区内和分区间的计算规则可以相同
//   rdd.aggregateByKey(0)(
//     (x,y) => x + y,
//     (x,y) => x + y
//   ).collect().foreach(println)

// 可以使用foldByKey来简化
rdd.foldByKey(0)(_+_).collect().foreach(println)
sc.stop()

6. combineByKey

针对相同K,将V合并成一个集合。

例子:

val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))

val rdd = sc.makeRDD(List(("a",1),("a",2),("b",3),
                          ("b",4),("b",5),("a",6)),2)

// 获取相同key的数据的平均值 => (a,3) (b,4)

// combineByKey 需要三个参数
// 第一个参数表示:将相同key的第一个数据进行数据结构的转换,实现操作
// 第二个参数表示:分区内的计算规则
// 第三个参数表示:分区间的计算规则

val newRDD = rdd.combineByKey(
    v => (v, 1), // 转换为 tuple是在运行当中动态得到的,所以下面的tuple需要添加数据类型
    (t:(Int, Int), v) => {
        (t._1 + v, t._2 + 1)
    },
    (t1:(Int, Int), t2:(Int, Int)) => {
        (t1._1 + t2._1, t1._2 + t2._2)
    }
)

val resultRDD = newRDD.mapValues {
    case (sum, cnt) => sum / cnt
}
sc.stop()

7. join

在类型为(K,V)和(K,W)的RDD上调用,返回一个相同的key对应的所有元素连接在一起的(K,(V,W))的RDD。

例子:

val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(List(("a",1),("b",2),("c",3),("d",4)))
val rdd2 = sc.makeRDD(List(("a",5),("a",6),("e",8),("c",7)))
// join : 两个不同数据源的数据,相同的key的value会连接在一起,形成元组。
//  如果两个数据源中key没有匹配上,那么数据不会出现在结果中。
//  如果两个数据源中key有多个相同的,会依次匹配,可能会出现笛卡尔积,数据量会几何性增长,会导致性能降低
rdd.join(rdd2).collect().foreach(println)
sc.stop()

8. sortByKey

在一个(K,V)的RDD上调用,K必须实现ordered接口,返回一个按照key进行排序的(K,V)的RDD

例子:

val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(List(("a",1),("b",2),("c",3),("d",4)))
//按照key对rdd中的元素进行排序,默认升序
rdd.sortByKey().collect().foreach(println)
//降序
rdd.sortByKey(false).collect().foreach(println)
sc.stop()

9. mapValues

针对于(K,V)形式的类型只对V进行操作

例子:

val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(List(("a",1),("b",2),("c",3),("d",4)))
rdd.mapValues("pre_"+_).collect().foreach(println)
sc.stop()

10. cogroup

相同的key,value分组后连接起来。

例子:

val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator"))
val rdd = sc.makeRDD(List(("a",1),("b",2),("c",3),("d",4)))
val rdd2 = sc.makeRDD(List(("a",5),("a",6),("e",8),("c",7)))
// cogroup : connect + group (分组,连接)
// 可以有多个参数
rdd.cogroup(rdd2).collect().foreach(println)
sc.stop()

行动算子

1. reduce

聚合RDD中的所有数据,先聚合分区内数据,在聚合分区间数据。

2. collect

采集,该方法会将不同分区间的数据按照分区顺序采集到Driver端,形成数组。

3. count

统计数据个数。

4. first

获取RDD中的第一个元素。

5. take

获取RDD前n个元素组成的数组。

6. takeOrdered

获取RDD排序后的前n个元素组成的数组。

7. aggregate

将每个分区里面的元素通过分区内逻辑和初始值进行聚合,然后用分区间逻辑和初始值(zeroValue)进行操作。注意:分区间逻辑再次使用初始值和aggregateByKey是有区别的。

8. fold

折叠操作,aggregate的简化操作,分区内逻辑和分区间逻辑相同。

9. countByValue

统计每个value的个数

10. countByKey

统计每种key的个数。

11. foreach

遍历RDD中每一个元素。

12. save

(1)saveAsTextFile(path)保存成Text文件
(2)saveAsSequenceFile(path) 保存成Sequencefile文件
(3)saveAsObjectFile(path) 序列化成对象保存到文件

例子:

val conf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(conf)
    val rdd = sc.makeRDD(List(1,2,3,4),2)
    // 行动算子,其实就是触发作业执行的方法
    // 底层代码调用的是环境对象中 runJob 方法,调用dagScheduler的runJob方法创建ActiveJob,并提交执行。

    // reduce
//    val i = rdd.reduce(_ + _)
//    println(i)


    // collect : 采集,该方法会将不同分区间的数据按照分区顺序采集到Driver端,形成数组
//    val ints = rdd.collect()
//    ints.foreach(println)


    // count : 数据源中数据个数
//    val l = rdd.count()
//    println(l)

    // first : 获取数据源中数据的第一个
//    val i = rdd.first()
//    println(i)

    // take : 获取数据源中数据的前N个
//    val ints = rdd.take(2)
//    println(ints.mkString(","))

    // takeOrdered : 数据先排序,再取N个,默认升序排序,可以使用第二个参数列表(比如 : Ordering.Int.reverse)实现倒序功能
//    val rdd1 = sc.makeRDD(List(4,3,2,1))
//    val ints1 = rdd1.takeOrdered(2)(Ordering.Int.reverse)
//    println(ints1.mkString(","))
	
	    //aggregate
//    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 8)
//    println(rdd.aggregate(10)(_ + _, _ + _))

    //fold 是aggregate的简化版
//    rdd.fold(10)(_+_)

    //countByKey 统计每种key出现的次数
//    val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2, "b"), (3, "c"), (3, "c")))
//    println(rdd.countByKey())
//    val intToLong = rdd.countByValue()
//    println(intToLong)


// save
val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("b",4)),2)
rdd.saveAsTextFile("output")
rdd.saveAsObjectFile("output1")
// saveAsSequenceFile 方法要求数据的格式必须为 K-V 键值对类型
rdd.saveAsSequenceFile("output2")
sc.stop()

结语

以上就是帮大家总结的Spark常用算子了。好了,今天就为大家分享到这里了,如果本文对你有帮助的话,欢迎点赞&收藏&分享,这对我继续分享&创作优质文章非常重要。感谢🙏🏻

–END–

非常欢迎大家加我个人微信,有关大数据的问题我们一起讨论。

扫码上方二维码,加我微信

有关一文学完Spark常用算子(Spark算子大全)的更多相关文章

  1. ruby-on-rails - Rails 常用字符串(用于通知和错误信息等) - 2

    大约一年前,我决定确保每个包含非唯一文本的Flash通知都将从模块中的方法中获取文本。我这样做的最初原因是为了避免一遍又一遍地输入相同的字符串。如果我想更改措辞,我可以在一个地方轻松完成,而且一遍又一遍地重复同一件事而出现拼写错误的可能性也会降低。我最终得到的是这样的:moduleMessagesdefformat_error_messages(errors)errors.map{|attribute,message|"Error:#{attribute.to_s.titleize}#{message}."}enddeferror_message_could_not_find(obje

  2. 7个大一C语言必学的程序 / C语言经典代码大全 - 2

    嗨~大家好,这里是可莉!今天给大家带来的是7个C语言的经典基础代码~那一起往下看下去把【程序一】打印100到200之间的素数#includeintmain(){ inti; for(i=100;i 【程序二】输出乘法口诀表#includeintmain(){inti;for(i=1;i 【程序三】判断1000年---2000年之间的闰年#includeintmain(){intyear;for(year=1000;year 【程序四】给定两个整形变量的值,将两个值的内容进行交换。这里提供两种方法来进行交换,第一种为创建临时变量来进行交换,第二种是不创建临时变量而直接进行交换。1.创建临时变量来

  3. 电脑怎么截图?进来看(8种常用截图方法) - 2

    电脑上可以截取图片吗?如果可以,该如何操作呢?相信很多小伙伴都只知道一两种截图的方式,知道的并不全面。其实,电脑上有多种方式截图的,而且非常方便。电脑怎么截图?今天我们就来教大家如何使用电脑截取图片的8种常用方式!操作环境:演示机型:Delloptiplex7050系统版本:Windows10方法一:系统自带截图具体操作:同时按下电脑的自带截图键【Windows+shift+S】,可以选择其中一种方式来截取图片:截屏有矩形截屏、任意形状截屏、窗口截屏和全屏截图。 方法二:QQ截图具体操作:在电脑登录QQ,然后同时按下【Ctrl+Alt+A】,可以任意截图你需要的界面,可以把截图的页面直接下载,

  4. 一文解决关于VLAN所有的疑惑 - 2

    一文解决关于VLAN所有的疑惑VLAN基本概念为什么需要VLAN?怎么在交换机上划分VLAN,VLAN的工作原理有了子网,已经隔离了广播,还需要VLAN干啥?只进行子网划分,不进行VLAN划分VLAN划分与子网划分附加VLAN信息的方法VLAN划分交换机的端口类型(Access和Trunk)一、访问链接二、汇聚链接汇聚链接VLAN间通信为什么要进行VLAN间通信?路由器实现VLAN间通信路由器和交换机的连接方式通信细节三层交换机实现VLAN间通信加速VLAN间通信三层交换机与路由器三层交换机路由器路由器和交换机配合构建LAN的实例使用VLAN设计局域网的特点VLAN增加网络的灵活性不使用VLA

  5. Unity常用文件夹 - 2

    1.Scenes游戏场景文件夹用于放置unity的场景文件 2.Plugins插件文件夹用于放置unity的依赖文件,例如dll 3.Scripts脚本文件夹用于放置unity的c#脚本文件 4.Resources游戏资源文件夹用于放置unity的各种游戏资源,比如images,prefabs,同时只有放到Resources文件夹的游戏资源才能使用Resource.load(资源路径不加后缀)加载到游戏内存中进行使用 5.EditorUnity编辑器扩展脚本文件夹usingUnityEditor;这个名称空间就是Unity编辑器的名称空间这个名称空间提供了扩展Unity编辑器的各种类 【你所有

  6. 一文让你彻底掌握操作符(超详细教程) - 2

    ✅作者简介:大家好,我是小杨📃个人主页:「小杨」的csdn博客🔥系列专栏:小杨带你玩转C语言【初阶】🐳希望大家多多支持🥰一起进步呀!大家好呀!我是小杨。小杨花几天的时间将C语言中的操作符这部分知识做了一个大总结,在方便自己复习的同时也能够帮助到大家。通篇字数在一万字左右,可以算作是非常详细了,一文就可以带领大家彻底掌握操作符这部分内容,文章很长建议先收藏再看,防止下次想看就找不到啦。文章目录✍1,算术操作符✍2,移位操作符    🔍2.1,左移操作符    🔍2.2,右移操作符       ✨2.2.1,算术移位       ✨2.2.2,逻辑移位✍3,位操作符    🔍3.1,按位与&   

  7. Spark的常用SQL日期函数 - 2

    一、获取当前时间1、current_date当前日期(年月日)Examples:SELECTcurrent_date;2、current_timestamp/now()当前日期(时间戳)Examples:SELECTcurrent_timestamp;二、从日期字段中提取时间1、year,month,day/dayofmonth,hour,minute,secondExamples:SELECTyear(now());其他的日期函数以此类推month:1day:12(当月的第几天)dayofmonth:12hour,minute,second:分别对应时分秒2、dayofweek、dayofm

  8. ruby - 将异常处理作为 Ruby 中的常用方法 - 2

    有人能告诉我有没有办法将异常处理作为一种通用方法并在方法中使用它?让我进一步解释一下。例如我有以下方法defadd(num1,num2)beginnum1+num2rescueException=>eraiseeendenddefdivide(num1,num2)beginnum1/num2rescueException=>eraiseeendend如您所见,尽管我的方法只需要一行,但由于异常处理代码,该方法变得更大了。我正在寻找的是一个更slim的解决方案,比如(只是一个想法)defadd(num1,num2)num1+num2unlessraise_exceptionenddefd

  9. 一文掌握软件项目成本预算、估算的方法和成本控制的秘籍 - 2

    每个企业都希望在完成项目后获得盈利,但不少企业到了年终后才发现项目做了不少,公司却并没能达到预期,甚至还出现了亏损。那么钱究竟去了哪里?很多公司都搞不清楚原因,出现糊涂账较多的状况,这将会造成严重的后果,尤其在疫情影响下,大环境很恶劣,如果是大公司的事业部门出现亏损,就可能会导致事业部门解散;如果是小公司出现亏损,就很容易导致公司倒闭;怎样做才能确保我们所完成的项目都能获利?从财务角度看,要确保盈利必须做到合理估算成本,只有这样才能在对外签订合约时做出合理报价,在对内在开始项目前做出充分评估投入代价,同时在实施过程中还要控制成本得当,最后项目结束时才会有可能获得盈利。那么我们怎样才能准确的判断

  10. 【结构与算法】—— 数据结构代码总结 | 数据结构代码大全 - 2

    📢博客主页:https://blog.csdn.net/dxt19980308📢欢迎点赞👍收藏⭐留言📝如有错误敬请指正!📢本文由肩匣与橘编写,首发于CSDN🙉📢生活依旧是美好而又温柔的,你也是✨目录🔴线性表1.1顺序表1.1.1顺序表定义1.1.2顺序表基本操作1.2单链表1.2.1单链表节点定义1.2.2单链表基本操作1.3双链表1.3.1双链表节点定义1.3.2双链表基本操作1.4静态链表🟠栈和队列2.1栈2.1.1顺序栈2.1.2链式栈2.2队列2.2.1顺序队列2.2.2链式队列2.3应用🟡串3.1串的定义与实现3.2串的模式匹配🟢树与二叉树4.1二叉树4.1.1二叉树的概念4.1.2

随机推荐