Spark RDD 转换算子(Transformations)

我们将RDD数据的转换操作称为Transformations ,RDD的所有转换操作都不会被计算,它仅记录作用于RDD上的操作,只有在遇到动作算子(Action)时才会进行计算。

下面是一些常见的转换算子:

转换

说明

map(func)

返回一个新的RDD,该RDD由每一个元素由 func 函数转换后组成。

filter(func)

返回一个新的RDD,该RDD由经过 func 函数计算后返回值为true的元素组成。

flatMap(func)

类似于 map,但是这里的 func 函数是将每个元素拆分成为0或多个元素输出,它应该返回一个 Seq,而不是一个元素。

mapPartitions(func)

类似于 map,但它是在每个分区上运行 func 函数,因此当在类型为 T 的RDD上运行时,func 函数类型必须是 Iterator[T] => Iterator[U]

mapPartitionsWithIndex(func)

类似于 mapPartitions,但它的 func 函数多一个当前分片索引号的参数,同 mapPartitionsfunc 函数类型也必须是 Iterator[T] => Iterator[U]

sample(withReplacement, fraction, seed)

数据抽样算子。
withReplacement:放回抽样或不放回抽样,True 表示放回抽样。
fraction:抽样比例,0-1之间的 double 类型参数,eg:0.3表示抽样30%。
seed:设置后会根据这个 seed 随机抽取。

union(otherRDD)

将两个RDD中的元素进行合并取并集,类型要求一致,返回一个新的RDD。

intersection(otherRDD)

将两个RDD中的元素进行合并取交集,类型要求一样,返回一个新的RDD。

distinct([numPartitions])

将当前RDD进行去重后,返回一个新的RDD。

groupByKey([numPartitions])

作用于Key-Value形式的RDD,将相同Key的值进行聚集,返回一个 (K, Iterable[V]) 类型的RDD。

reduceByKey(func, [numPartitions])

作用于Key-Value形式的RDD,将相同Key的值使用 func 进行reduce聚合,返回新的RDD。

sortByKey([ascending], [numPartitions])

作用于Key-Value形式的RDD,根据Key进行排序。

join(otherRDD, [numPartitions])

作用于Key-Value形式的RDD,将相同Key的数据join在一起。

cogroup(otherRDD, [numPartitions])

作用于Key-Value形式的RDD,将相同key的数据分别聚合成一个集合。

cartesian(otherRDD)

返回两个RDD的笛卡尔积RDD。

pipe(command, [envVars])

执行一个外部脚本,返回输出的RDD。

coalesce(numPartitions)

缩减分区数,用于大数据集过滤后提高小数据集的执行效率。

repartition(numPartitions)

根据传入的分区数重新分区。

repartitionAndSortWithinPartitions(partitioner)

重新分区+排序,这比先分区再排序效率高。

示例

map

下面是一个简单的 map 示例:

val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))
println(rdd.map(x => 10 * x).collect.mkString(","))

上面代码将输出:

10,20,30,40,50,60,70,80,90,100

filter

比如分别过滤出1到10中的奇数和偶数,代码如下:

val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))
println(rdd.filter(x => x % 2 == 0).collect.mkString(","))
println(rdd.filter(x => x % 2 != 0).collect.mkString(","))

上面代码将输出:

2,4,6,8,10
1,3,5,7,9

flatMap

flatMap 的功能是将RDD中的每个元素进行拆分。 比如,假设我们读取了文本中的每一行,现在将其按空格拆分成单个词,代码如下:

val fileRDD = sc.parallelize(List("this is the fist line", "this is the second line"))
println(fileRDD.flatMap(x => x.split(" ")).collect.toSeq)

上面代码将输出:

WrappedArray(this, is, the, fist, line, this, is, the, second, line)

mapPartitions

相对于 mapfunc 逐条应用于每条数据,mapPartitions 是在每个分区上一次性将所有数据应于函数 func 。 这样某些情况下具有更高的效率,示例:

val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))
println(rdd.mapPartitions(_.map(x => 10 * x)).collect.toSeq)

上面代码将输出:

WrappedArray(10, 20, 30, 40, 50, 60, 70, 80, 90, 100)

Note

虽然 mapPartitions 的效率优于 map,但是在内存有限的情况下可能会内存溢出。

mapPartitionsWithIndex

类似 mapPartitions, 区别是 mapPartitionsWithIndexfunc 函数多接受一个回调参数:当前分片的索引号。代码示例:

val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10), numSlices = 4)
println(rdd.mapPartitionsWithIndex((index,items)=>(items.map(x=>(index,x)))).collect().mkString(" - "))

上面代码将输出:

(0,1) - (0,2) - (1,3) - (1,4) - (1,5) - (2,6) - (2,7) - (3,8) - (3,9) - (3,10)

sample

根据传入按比例进行有放回或者不放回的抽样。代码示例:

val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))
println(rdd.sample(withReplacement = false, 0.3).collect.mkString(" "))

上面代码将输出:

1 6 10

union

将两个RDD中的元素进行合并取并集,类型要求一致,返回一个新的RDD。代码示例:

val rdd1 = sc.parallelize(1 to 5)
val rdd2= sc.parallelize(6 to 10)
println(rdd1.union(rdd2).collect.mkString(" "))

上面代码将输出:

1 2 3 4 5 6 7 8 9 10

intersection

将两个RDD中的元素进行合并取交集,类型要求一样,返回一个新的RDD。代码示例:

val rdd1 = sc.parallelize(1 to 5)
val rdd2= sc.parallelize(3 to 8)
println(rdd1.intersection(rdd2).collect.mkString(" "))

上面代码将输出:

4 3 5

distinct

对RDD里的元素进行去重操作。代码示例:

val rdd = sc.parallelize(Seq(1,1,2,2,3,4))
println(rdd.distinct.collect.mkString(" "))

上面代码将输出:

4 1 3 2

groupByKey

groupByKey 用于将 RDD[K,V] 中相同 K 的值合并到一个集合 Iterable[V] 中。代码示例:

val rdd = sc.parallelize(Seq(("a", 1), ("b", 2), ("b", 3), ("c", 4), ("c", 5), ("d", 6)))
rdd.groupByKey.collect.foreach(println)

上面代码将输出:

(d,CompactBuffer(6))
(a,CompactBuffer(1))
(b,CompactBuffer(2, 3))
(c,CompactBuffer(4, 5))

reduceByKey

reduceByKeyRDD[K,V] 上调用,返回一个 RDD[K,V] ,使用指定的reduce函数,将相同key的值聚合到一起。代码示例:

val rdd = sc.parallelize(Seq(("a", 1), ("b", 2), ("b", 3), ("c", 4), ("c", 5), ("d", 6)))
rdd.reduceByKey(_+_).collect.foreach(println)

上面代码将输出:

(d,6)
(a,1)
(b,5)
(c,9)

sortByKey

sortByKeyRDD[K,V] 上调用,返回一个 RDD[K,V] ,根据key进行排序,默认为 ascending: Boolean = true (”升序“)。代码示例:

val rdd = sc.parallelize(Seq(("a", 1), ("c", 2), ("b", 3), ("f", 4), ("e", 5), ("d", 6)))
rdd.sortByKey().collect.foreach(println)

上面代码将输出:

(a,1)
(b,3)
(c,2)
(d,6)
(e,5)
(f,4)

join

joinfullOuterJoinleftOuterJoinrightOuterJoin 都是针对 RDD[K,V]K 值相同的连接操作, 分别对应内连接、全连接、左连接、右连接。代码示例:

val rdd1 = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3), ("d", 4)))
val rdd2 = sc.parallelize(Seq(("b", 5), ("c", 6), ("d", 7), ("e", 8)))
rdd1.join(rdd2).collect.foreach(println)

上面代码将输出:

(d,(4,7))
(b,(2,5))
(c,(3,6))

cogroup

代码示例:

val rdd1 = sc.parallelize(1 to 3)
val rdd2 = sc.parallelize(4 to 6)
rdd1.cartesian(rdd2).collect.foreach(println)

上面代码将输出:

(1,4)
(1,5)
(1,6)
(2,4)
(2,5)
(2,6)
(3,4)
(3,5)
(3,6)

coalesce

coalesce 用来缩减分区,第二个可选参数 shuffle 是减少分区的过程中是否shuffle; true 为是, false 为否。默认为 false 。 代码示例:

val rdd1 = sc.parallelize(1 to 10, 4)
println("rdd1 NumPartitions: " + rdd1.getNumPartitions)
val rdd2 = rdd1.coalesce(2)
println("rdd2 NumPartitions: " + rdd2.getNumPartitions)

上面代码将输出:

rdd1 NumPartitions: 4
rdd2 NumPartitions: 2

repartition

代码示例:

val rdd1 = sc.parallelize(1 to 10, 2)
println("rdd1 NumPartitions: " + rdd1.getNumPartitions)
val rdd2 = rdd1.repartition(4)
println("rdd2 NumPartitions: " + rdd2.getNumPartitions)

上面代码将输出:

rdd12 NumPartitions: 2
rdd13 NumPartitions: 4

完整代码

package com.sparkexamples.spark.rdd

import org.apache.spark.sql.SparkSession

object RDDTransformationExamples {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[1]")
      .appName("SparkExample")
      .getOrCreate()

    val sc = spark.sparkContext

    val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))

    println("**** map example: ****")
    println(rdd1.map(x => 10 * x).collect.mkString(","))

    println("**** filter example: ****")
    println(rdd1.filter(x => x % 2 == 0).collect.mkString(","))
    println(rdd1.filter(x => x % 2 != 0).collect.mkString(","))

    println("**** flatMap example: ****")
    val fileRDD = sc.parallelize(List("this is the fist line", "this is the second line"))
    println(fileRDD.flatMap(x => x.split(" ")).collect.toSeq)

    println("**** mapPartitions example: ****")
    println(rdd1.mapPartitions(_.map(x => 10 * x)).collect.toSeq)

    println("**** mapPartitionsWithIndex example: ****")
    val rdd2 = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10), numSlices = 4)
    println(rdd2.mapPartitionsWithIndex((index,items)=>(items.map(x=>(index,x)))).collect.mkString(" - "))

    println("**** sample example: ****")
    println(rdd1.sample(withReplacement = false, 0.3).collect.mkString(" "))

    println("**** union example: ****")
    println(rdd1.union(rdd2).collect.mkString(" "))

    println("**** intersection example: ****")
    println(rdd1.intersection(rdd2).collect.mkString(" "))

    println("**** distinct example: ****")
    val rdd3 = sc.parallelize(Seq(1,1,2,2,3,4))
    println(rdd3.distinct.collect.mkString(" "))

    val rdd4 = sc.parallelize(Seq(("a", 1), ("b", 2), ("b", 3), ("c", 4), ("c", 5), ("d", 6)))
    println("**** groupByKey example: ****")
    rdd4.groupByKey.collect.foreach(println)

    println("**** reduceByKey example: ****")
    rdd4.reduceByKey(_+_).collect.foreach(println)

    val rdd5 = sc.parallelize(Seq(("a", 1), ("c", 2), ("b", 3), ("f", 4), ("e", 5), ("d", 6)))
    println("**** sortByKey example: ****")
    rdd5.sortByKey().collect.foreach(println)

    val rdd6 = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3), ("d", 4)))
    val rdd7 = sc.parallelize(Seq(("b", 5), ("c", 6), ("d", 7), ("e", 8)))
    println("**** join example: ****")
    rdd6.join(rdd7).collect.foreach(println)

    println("**** cogroup example: ****")
    rdd6.cogroup(rdd7).collect.foreach(println)

    val rdd8 = sc.parallelize(1 to 3)
    val rdd9 = sc.parallelize(4 to 6)
    println("**** cartesian example: ****")
    rdd8.cartesian(rdd9).collect.foreach(println)

    println("**** coalesce example: ****")
    val rdd10 = sc.parallelize(1 to 10, 4)
    println("rdd10 NumPartitions: " + rdd10.getNumPartitions)
    val rdd11 = rdd10.coalesce(2)
    println("rdd11 NumPartitions: " + rdd11.getNumPartitions)

    println("**** repartition example: ****")
    val rdd12 = sc.parallelize(1 to 10, 2)
    println("rdd12 NumPartitions: " + rdd12.getNumPartitions)
    val rdd13 = rdd12.repartition(4)
    println("rdd13 NumPartitions: " + rdd13.getNumPartitions)

  }

}

在线查看