创建RDD¶
RDD(Resilient Distributed Dataset)弹性分布式数据集是Spark中最重要的概念之一。它是一个只读的数据集合,之所以称它为弹性的,是因为它们能够在节点发生故障时根据血缘关系重新计算。
通常有下面三种创建RDD的方式:
Parallelize集合创建:接收一个已存在的Scala集合,然后进行各种并行计算。
加载外部数据源创建:外部数据源可以是本地文件或Hadoop文件系统的HDFS等。
由RDD、DataFrame和DataSet转换创建。
Parallelize集合创建¶
可以将已存在的数据集合传递给SparkContext的 parallelize
方法创建RDD,详见Spark Parallelize 方法。
示例:
val rdd = sc.parallelize(Array(1,2,3,4,5))
指定分片数:
val rdd = sc.parallelize(List(1,2,3,4,5),3) // 指定分片数 3
加载外部数据源创建¶
Spark可以从所有Hadoop支持的存储源创建RDD,包括本地文件系统、HDFS、Cassandra、HBase、Amazon S3等等。它支持文本格式、SequenceFile和其他所有Hadoop InputFormat。
使用SparkContext的 textFile()
方法创建文本文件RDD。此方法接收文件URI(本地文件路径或 hdfs://
, s3a://
等URI)作为参数,按行读取创建RDD。下面是一个调用示例:
val rdd = sc.textFile("/path/textFile.txt") // 文件路径
关于 textFile()
的更多用法,详见 Spark RDD - textFile。
由RDD转换创建¶
可以使用转换算子如 map
、flatmap
、filter
等从现有RDD创建新RDD。
假设这个场景, 从日志文件中找出含有 ERROR
字符串的行。我们先使用 textFile
加载日志文件创建RDD。然后再使用 filter
方法过滤出目标行,返回一个新的RDD,代码如下:
val logRDD = sc.textFile("/path/log.txt")
val errorRDD = logRDD.filter(_.contains("ERROR"))
由DataFrame和DataSet转换创建¶
DataSet和DataFrame的 rdd
属性将返回当前数据集的RDD形式。
val rdd = spark.range(10).toDF().rdd