Spark RDD 行动算子(Actions)¶
正如转换算子中提到的,所有的转换算子都是惰性的,这意味着它们不会立即执行,只有在遇到行动算子时才会触发执行。
Spark中的行动算子分为两类:一类算子执行转换算子生成Scala集合或变量;另一类是将RDD保存到外部文件系统或者数据库中。
下面是一些常见的行动算子:
Action |
说明 |
---|---|
|
使用函数 func 聚合RDD中的所有元素。该函数应该是可交换和关联的。 |
|
以数组形式返回所有的数据。 |
|
返回 RDD 中元素个数。 |
|
返回 RDD 中第一个元素。 |
|
返回 RDD 中前 n 个元素组成的数据。 |
|
返回一个数组,该数组由从 RDD 中随机采样的 num 个元素组成返回。 |
|
按 RDD 的自然顺序或自定义比较器返回 RDD 的前 n 个元素。 |
|
将 RDD 中的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用 |
|
将 RDD 中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以是HDFS或者其他Hadoop支持的文件系统。 |
|
应用于 (K,V) 类型的 RDD ,返回一个 (K,Int) 的map,表示每一个key对应的元素个数。 |
|
在 RDD 中的每一个元素上运行函数 func 。 |
示例¶
reduce¶
reduce
常用来整合RDD中所有数据,例如求和操作。 示例:
val rdd = sc.parallelize(1 to 10)
println(rdd.reduce((x, y) => x + y))
// 输出: 55
上面的函数部分也可简写为:
rdd.reduce(_+_)
collect¶
示例:
val rdd = sc.parallelize(1 to 10)
println(rdd.collect().mkString(" "))
// 输出: 1 2 3 4 5 6 7 8 9 10
Note
collect
应该在 filter
或其他操作后返回一个足够小的数据集时使用。 如果直接将整个数据集 collect
返回,这可能会使driver程序OOM。
count¶
示例:
val rdd = sc.parallelize(1 to 10)
println("elements number:" + rdd.count())
// 输出: elements number:10
first¶
示例:
val rdd = sc.parallelize(1 to 10)
println("the first element:" + rdd.first())
// 输出: the first element:1
take¶
示例:
val rdd = sc.parallelize(1 to 10)
println("the first two element:" + rdd.take(2).mkString(" "))
// 输出: the first two element:1 2
Note
仅当预期生成的数组很小时才应使用此方法,因为所有数据都将加载到driver内存中。
takeSample¶
同 take
方法,该方法仅在预期结果数组很小的情况下使用,因为所有数据都被加载到driver内存中。
语法:
takeSample(withReplacement: Boolean, num: Int, seed: Long)
withReplacement:元素是否可以多次抽样;
num:返回样本大小;
seed:随机数生成器的种子;
示例:
val rdd = sc.parallelize(1 to 10)
println("take two sample element:" + rdd1.takeSample(withReplacement = true,2).mkString(" "))
// 输出: the first two element:3 10
takeOrdered¶
同 take
方法,该方法仅在预期结果数组很小的情况下使用,因为所有数据都被加载到driver内存中。
示例:
val rdd = sc.parallelize(1 to 10)
println("take two element with order:" + rdd.takeOrdered(2).mkString(" "))
// 输出: take two element with order:1 2
countByKey¶
示例:
val rdd = sc.parallelize(Seq("a", "b", "b", "c", "c", "c", "d"))
println(rdd.map(x => (x, 1)).countByKey())
// 输出: Map(d -> 1, a -> 1, b -> 2, c -> 3)
完整代码¶
package com.sparkexamples.spark.rdd
import org.apache.spark.sql.SparkSession
object RDDActionExamples {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[1]")
.appName("SparkExample")
.getOrCreate()
val sc = spark.sparkContext
val rdd1 = sc.parallelize(1 to 10)
println("**** reduce example: ****")
println(rdd1.reduce((x, y) => x + y))
println("**** collect example: ****")
println(rdd1.collect().mkString(" "))
println("**** count example: ****")
println("elements number:" + rdd1.count())
println("**** first example: ****")
println("the first element:" + rdd1.first())
println("**** take example: ****")
println("the first two element:" + rdd1.take(2).mkString(" "))
println("**** takeSample example: ****")
println("take two sample element:" + rdd1.takeSample(withReplacement = true,2).mkString(" "))
println("**** takeOrdered example: ****")
println("take two element with order:" + rdd1.takeOrdered(2).mkString(" "))
println("**** saveAsTextFile example: ****")
rdd1.saveAsTextFile("path/data")
println("**** saveAsTextFile example: ****")
val rdd2 = sc.parallelize(Seq("a", "b", "b", "c", "c", "c", "d"))
println(rdd2.map(x => (x, 1)).countByKey())
}
}
在线查看。