# Spark RDD 转换算子(Transformations)
我们将RDD数据的转换操作称为**Transformations** ,RDD的所有转换操作都不会被计算,它仅记录作用于RDD上的操作,只有在遇到动作算子(Action)时才会进行计算。
下面是一些常见的转换算子:
```{list-table}
:widths: 10 40
:header-rows: 1
* - 转换
- 说明
* - `map(func)`
- 返回一个新的RDD,该RDD由每一个元素由 *func* 函数转换后组成。
* - `filter(func)`
- 返回一个新的RDD,该RDD由经过 *func* 函数计算后返回值为true的元素组成。
* - `flatMap(func)`
- 类似于 `map`,但是这里的 *func* 函数是将每个元素拆分成为0或多个元素输出,它应该返回一个 `Seq`,而不是一个元素。
* - `mapPartitions(func)`
- 类似于 `map`,但它是在每个分区上运行 *func* 函数,因此当在类型为 *T* 的RDD上运行时,*func* 函数类型必须是 *Iterator[T] => Iterator[U]* 。
* - `mapPartitionsWithIndex(func)`
- 类似于 `mapPartitions`,但它的 *func* 函数多一个当前分片索引号的参数,同 `mapPartitions` ,*func* 函数类型也必须是 *Iterator[T] => Iterator[U]* 。
* - `sample(withReplacement, fraction, seed)`
- 数据抽样算子。
**withReplacement**:放回抽样或不放回抽样,*True* 表示放回抽样。
**fraction**:抽样比例,0-1之间的 *double* 类型参数,eg:0.3表示抽样30%。
**seed**:设置后会根据这个 *seed* 随机抽取。
* - `union(otherRDD)`
- 将两个RDD中的元素进行合并取并集,类型要求一致,返回一个新的RDD。
* - `intersection(otherRDD)`
- 将两个RDD中的元素进行合并取交集,类型要求一样,返回一个新的RDD。
* - `distinct([numPartitions])`
- 将当前RDD进行去重后,返回一个新的RDD。
* - `groupByKey([numPartitions])`
- 作用于Key-Value形式的RDD,将相同Key的值进行聚集,返回一个 *(K, Iterable[V])* 类型的RDD。
* - `reduceByKey(func, [numPartitions])`
- 作用于Key-Value形式的RDD,将相同Key的值使用 *func* 进行reduce聚合,返回新的RDD。
* - `sortByKey([ascending], [numPartitions])`
- 作用于Key-Value形式的RDD,根据Key进行排序。
* - `join(otherRDD, [numPartitions])`
- 作用于Key-Value形式的RDD,将相同Key的数据join在一起。
* - `cogroup(otherRDD, [numPartitions])`
- 作用于Key-Value形式的RDD,将相同key的数据分别聚合成一个集合。
* - `cartesian(otherRDD)`
- 返回两个RDD的笛卡尔积RDD。
* - `pipe(command, [envVars])`
- 执行一个外部脚本,返回输出的RDD。
* - `coalesce(numPartitions)`
- 缩减分区数,用于大数据集过滤后提高小数据集的执行效率。
* - `repartition(numPartitions)`
- 根据传入的分区数重新分区。
* - `repartitionAndSortWithinPartitions(partitioner)`
- 重新分区+排序,这比先分区再排序效率高。
```
## 示例
### map
下面是一个简单的 `map` 示例:
```scala
val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))
println(rdd.map(x => 10 * x).collect.mkString(","))
```
上面代码将输出:
```shell
10,20,30,40,50,60,70,80,90,100
```
### filter
比如分别过滤出1到10中的奇数和偶数,代码如下:
```scala
val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))
println(rdd.filter(x => x % 2 == 0).collect.mkString(","))
println(rdd.filter(x => x % 2 != 0).collect.mkString(","))
```
上面代码将输出:
```shell
2,4,6,8,10
1,3,5,7,9
```
### flatMap
`flatMap` 的功能是将RDD中的每个元素进行拆分。 比如,假设我们读取了文本中的每一行,现在将其按空格拆分成单个词,代码如下:
```scala
val fileRDD = sc.parallelize(List("this is the fist line", "this is the second line"))
println(fileRDD.flatMap(x => x.split(" ")).collect.toSeq)
```
上面代码将输出:
```shell
WrappedArray(this, is, the, fist, line, this, is, the, second, line)
```
### mapPartitions
相对于 `map` 将 *func* 逐条应用于每条数据,`mapPartitions` 是在每个分区上一次性将所有数据应于函数 *func* 。 这样某些情况下具有更高的效率,示例:
```scala
val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))
println(rdd.mapPartitions(_.map(x => 10 * x)).collect.toSeq)
```
上面代码将输出:
```shell
WrappedArray(10, 20, 30, 40, 50, 60, 70, 80, 90, 100)
```
```{note}
虽然 `mapPartitions` 的效率优于 `map`,但是在内存有限的情况下可能会内存溢出。
```
### mapPartitionsWithIndex
类似 `mapPartitions`, 区别是 `mapPartitionsWithIndex` 的 *func* 函数多接受一个回调参数:当前分片的索引号。代码示例:
```scala
val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10), numSlices = 4)
println(rdd.mapPartitionsWithIndex((index,items)=>(items.map(x=>(index,x)))).collect().mkString(" - "))
```
上面代码将输出:
```shell
(0,1) - (0,2) - (1,3) - (1,4) - (1,5) - (2,6) - (2,7) - (3,8) - (3,9) - (3,10)
```
### sample
根据传入按比例进行有放回或者不放回的抽样。代码示例:
```scala
val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))
println(rdd.sample(withReplacement = false, 0.3).collect.mkString(" "))
```
上面代码将输出:
```shell
1 6 10
```
### union
将两个RDD中的元素进行合并取并集,类型要求一致,返回一个新的RDD。代码示例:
```scala
val rdd1 = sc.parallelize(1 to 5)
val rdd2= sc.parallelize(6 to 10)
println(rdd1.union(rdd2).collect.mkString(" "))
```
上面代码将输出:
```shell
1 2 3 4 5 6 7 8 9 10
```
### intersection
将两个RDD中的元素进行合并取交集,类型要求一样,返回一个新的RDD。代码示例:
```scala
val rdd1 = sc.parallelize(1 to 5)
val rdd2= sc.parallelize(3 to 8)
println(rdd1.intersection(rdd2).collect.mkString(" "))
```
上面代码将输出:
```shell
4 3 5
```
### distinct
对RDD里的元素进行去重操作。代码示例:
```scala
val rdd = sc.parallelize(Seq(1,1,2,2,3,4))
println(rdd.distinct.collect.mkString(" "))
```
上面代码将输出:
```shell
4 1 3 2
```
### groupByKey
`groupByKey` 用于将 *RDD[K,V]* 中相同 *K* 的值合并到一个集合 *Iterable[V]* 中。代码示例:
```scala
val rdd = sc.parallelize(Seq(("a", 1), ("b", 2), ("b", 3), ("c", 4), ("c", 5), ("d", 6)))
rdd.groupByKey.collect.foreach(println)
```
上面代码将输出:
```shell
(d,CompactBuffer(6))
(a,CompactBuffer(1))
(b,CompactBuffer(2, 3))
(c,CompactBuffer(4, 5))
```
### reduceByKey
`reduceByKey` 在 *RDD[K,V]* 上调用,返回一个 *RDD[K,V]* ,使用指定的reduce函数,将相同key的值聚合到一起。代码示例:
```scala
val rdd = sc.parallelize(Seq(("a", 1), ("b", 2), ("b", 3), ("c", 4), ("c", 5), ("d", 6)))
rdd.reduceByKey(_+_).collect.foreach(println)
```
上面代码将输出:
```shell
(d,6)
(a,1)
(b,5)
(c,9)
```
### sortByKey
`sortByKey` 在 *RDD[K,V]* 上调用,返回一个 *RDD[K,V]* ,根据key进行排序,默认为 `ascending: Boolean = true` (”升序“)。代码示例:
```scala
val rdd = sc.parallelize(Seq(("a", 1), ("c", 2), ("b", 3), ("f", 4), ("e", 5), ("d", 6)))
rdd.sortByKey().collect.foreach(println)
```
上面代码将输出:
```shell
(a,1)
(b,3)
(c,2)
(d,6)
(e,5)
(f,4)
```
### join
`join` 、 `fullOuterJoin` 、 `leftOuterJoin` 、 `rightOuterJoin` 都是针对 *RDD[K,V]* 中 *K* 值相同的连接操作,
分别对应内连接、全连接、左连接、右连接。代码示例:
```scala
val rdd1 = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3), ("d", 4)))
val rdd2 = sc.parallelize(Seq(("b", 5), ("c", 6), ("d", 7), ("e", 8)))
rdd1.join(rdd2).collect.foreach(println)
```
上面代码将输出:
```shell
(d,(4,7))
(b,(2,5))
(c,(3,6))
```
### cogroup
代码示例:
```scala
val rdd1 = sc.parallelize(1 to 3)
val rdd2 = sc.parallelize(4 to 6)
rdd1.cartesian(rdd2).collect.foreach(println)
```
上面代码将输出:
```shell
(1,4)
(1,5)
(1,6)
(2,4)
(2,5)
(2,6)
(3,4)
(3,5)
(3,6)
```
### coalesce
`coalesce` 用来缩减分区,第二个可选参数 *shuffle* 是减少分区的过程中是否shuffle; *true* 为是, *false* 为否。默认为 *false* 。 代码示例:
```scala
val rdd1 = sc.parallelize(1 to 10, 4)
println("rdd1 NumPartitions: " + rdd1.getNumPartitions)
val rdd2 = rdd1.coalesce(2)
println("rdd2 NumPartitions: " + rdd2.getNumPartitions)
```
上面代码将输出:
```shell
rdd1 NumPartitions: 4
rdd2 NumPartitions: 2
```
### repartition
代码示例:
```scala
val rdd1 = sc.parallelize(1 to 10, 2)
println("rdd1 NumPartitions: " + rdd1.getNumPartitions)
val rdd2 = rdd1.repartition(4)
println("rdd2 NumPartitions: " + rdd2.getNumPartitions)
```
上面代码将输出:
```shell
rdd12 NumPartitions: 2
rdd13 NumPartitions: 4
```
## 完整代码
```scala
package com.sparkexamples.spark.rdd
import org.apache.spark.sql.SparkSession
object RDDTransformationExamples {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[1]")
.appName("SparkExample")
.getOrCreate()
val sc = spark.sparkContext
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))
println("**** map example: ****")
println(rdd1.map(x => 10 * x).collect.mkString(","))
println("**** filter example: ****")
println(rdd1.filter(x => x % 2 == 0).collect.mkString(","))
println(rdd1.filter(x => x % 2 != 0).collect.mkString(","))
println("**** flatMap example: ****")
val fileRDD = sc.parallelize(List("this is the fist line", "this is the second line"))
println(fileRDD.flatMap(x => x.split(" ")).collect.toSeq)
println("**** mapPartitions example: ****")
println(rdd1.mapPartitions(_.map(x => 10 * x)).collect.toSeq)
println("**** mapPartitionsWithIndex example: ****")
val rdd2 = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10), numSlices = 4)
println(rdd2.mapPartitionsWithIndex((index,items)=>(items.map(x=>(index,x)))).collect.mkString(" - "))
println("**** sample example: ****")
println(rdd1.sample(withReplacement = false, 0.3).collect.mkString(" "))
println("**** union example: ****")
println(rdd1.union(rdd2).collect.mkString(" "))
println("**** intersection example: ****")
println(rdd1.intersection(rdd2).collect.mkString(" "))
println("**** distinct example: ****")
val rdd3 = sc.parallelize(Seq(1,1,2,2,3,4))
println(rdd3.distinct.collect.mkString(" "))
val rdd4 = sc.parallelize(Seq(("a", 1), ("b", 2), ("b", 3), ("c", 4), ("c", 5), ("d", 6)))
println("**** groupByKey example: ****")
rdd4.groupByKey.collect.foreach(println)
println("**** reduceByKey example: ****")
rdd4.reduceByKey(_+_).collect.foreach(println)
val rdd5 = sc.parallelize(Seq(("a", 1), ("c", 2), ("b", 3), ("f", 4), ("e", 5), ("d", 6)))
println("**** sortByKey example: ****")
rdd5.sortByKey().collect.foreach(println)
val rdd6 = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3), ("d", 4)))
val rdd7 = sc.parallelize(Seq(("b", 5), ("c", 6), ("d", 7), ("e", 8)))
println("**** join example: ****")
rdd6.join(rdd7).collect.foreach(println)
println("**** cogroup example: ****")
rdd6.cogroup(rdd7).collect.foreach(println)
val rdd8 = sc.parallelize(1 to 3)
val rdd9 = sc.parallelize(4 to 6)
println("**** cartesian example: ****")
rdd8.cartesian(rdd9).collect.foreach(println)
println("**** coalesce example: ****")
val rdd10 = sc.parallelize(1 to 10, 4)
println("rdd10 NumPartitions: " + rdd10.getNumPartitions)
val rdd11 = rdd10.coalesce(2)
println("rdd11 NumPartitions: " + rdd11.getNumPartitions)
println("**** repartition example: ****")
val rdd12 = sc.parallelize(1 to 10, 2)
println("rdd12 NumPartitions: " + rdd12.getNumPartitions)
val rdd13 = rdd12.repartition(4)
println("rdd13 NumPartitions: " + rdd13.getNumPartitions)
}
}
```
在线[查看](https://github.com/jhao104/spark-examples/blob/main/src/main/scala/com/sparkexamples/spark/rdd/RDDTransformationExamples.scala)。