Spark By Examples¶
Spark — Windows环境安装¶
安装Java¶
Windows上安装Apache Spark需要Java 8或更高版本,你可以从 Oracle 下载相应的Java版本然后安装。 使用OpenJDK可以从这里下载。
下载安装后,请参考Java安装教程配置好环境变量。
Note
本文使用的环境是Java8,同样的步骤也适用于Java11和13版本。
安装Spark¶
访问Spark下载页下载Apache Spark压缩包。 可以使用下拉框选择不同版本的Spark和Hadoop,然后点击第三行的下载链接即可下载。

ApacheSpark下载页¶
下载完成后使用解压工具(推荐7zip)将其解压,然后将解压后的目录 spark-3.0.3-bin-hadoop2.7
复制到 C:\opt\spark-3.0.3-bin-hadoop2.7
。
配置环境变量¶
Windows上安装Apache Spark需要配置 Java_HOME
(安装Java时配置)、 Spark_HOME
、 HADOOP_HOME
和 PATH
环境变量。
根据安装目录需要做如下配置:
SPARK_HOME = C:\opt\spark-3.0.3-bin-hadoop2.7
HADOOP_HOME = C:\opt\spark-3.0.3-bin-hadoop2.7
PATH=%PATH%;%SPARK_HOME%
新建环境变量 SPARK_HOME
:
新建环境变量 HADOOP_HOME
:
将 %SPARK_HOME%
添加至环境变量 PATH
中:
安装 winutils.exe¶
要在Windows上运行Apache Spark,还需要下载 winutils.exe
,可以到这里自行下载对应Hadoop版本的winutils包,然后将其内容复制到 %SPARK_HOME%\bin
目录下。
Note
Hadoop的版本可以在 %SPARK_HOME%
目录中的 RELEASE
文件中看到。
Spark shell¶
Spark shell是Spark发行版中附带的CLI工具。在配置好环境变量后, 在命令行中运行命令 spark-shell
即可打开Spark shell交互式命令行工具。

Spark Shell Command Line¶
默认情况下,spark-shell同时还会拉起一个Web页面,在本例中为http://host.docker.internal:4040 , 可以在浏览器中直接打开。实际地址请详见spark-shell的输出提示。
在spark-shell的命令行中可以运行一些spark语句,如创建RDD、获取spark版本等。
scala> spark.version
res0: String = 3.0.3
scala> val rdd = sc.parallelize(Array(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
Spark — Linux环境安装¶
要求¶
这里演示如何在Linux中安装Spark单机版。和在Windows一样,安装Spark需提前安装Java 8或更高版本,参考Spark — Windows环境安装。
下载Spark¶
访问Spark下载页下载Apache Spark压缩包。 可以使用下拉框选择不同版本的Spark和Hadoop,然后点击第三行获取下载链接。

ApacheSpark下载页¶
在命令行使用 wget
命令下载:
wget https://downloads.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
下载完成后解压至目录 /opt/spark
:
tar -zxvf spark-3.0.3-bin-hadoop2.7.tgz
mv spark-3.0.3-bin-hadoop2.7 /opt/spark
配置环境变量¶
配置Spark环境变量:
[root@bigdata-app]$ vim ~/.bashrc
# 在文件中加入下面两行.
export SPARK_HOME=/opt/spark/spark-3.0.3-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME/bin
然后运行下面命令使环境变量生效:
source ~/.bashrc
验证¶
至此Spark已完成在Linux机器上的安装,可以运行 spark-shell
验证是否安装完成,也可以使用 spark-submit
运行一个Spark例子:
spark-submit --class org.apache.spark.examples.SparkPi /opt/spark/spark-3.0.3-bin-hadoop2.7/examples/jars/spark-examples_2.12-3.0.3.jar 10
Spark — 在IntelliJ IDEA上创建项目¶
添加Sbt依赖¶
编辑 build.sbt
文件,添加Spark依赖包:
name := "SparkExamples"
version := "0.1"
scalaVersion := "2.12.2"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.3"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.3"
创建WordCount例子¶
在 src/main/scala
下创建 WordCount.scala
文件,代码如下:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object WordCount {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("WordCountExample")
.setMaster("local[2]")
val sc = new SparkContext(conf)
val textFile = sc.textFile("D:\\Code\\Scala\\word.txt")
val wordCount = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
wordCount.foreach(println)
}
}
右键 WordCount.scala
选择运行即可:
Spark — SparkSession¶
Apache Spark 2.0引入了 SparkSession
,它为用户提供了一个统一的切入点来使用Spark的各项功能,
并且允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序。最重要的是,它减少了用户需要了解的一些概念,使得用户可以很容易地与Spark交互。
SparkSession介绍¶
Spark 2.0引入了一个新类 org.apache.Spark.sql.SparkSession
,它内部封装了我们在2.0之前版本中
使用的所有上下文的组合类( SQLContext
和 HiveContext
等等),使用起来更加方便。
正如前面所讲,SparkSession是Spark程序的切入点,所有Spark程序都应以创建SparkSession实例开始。
使用 SparkSession.builder()
创建SparkSession。
SparkSession包括了以下上下文接口:
Spark Context
SQL Context
Streaming Context
Hive Context
spark-shell中使用¶
Spark Shell默认提供了一个名为 spark
的SparkSession类的实例对象,我们可以在spark-shell中直接使用此对象。
scala> val sqlContext = spark.sqlContext
SparkSession创建¶
在Scala中创建SparkSession,需要使用生成器方法 builder()
然后调用 getOrCreate()
方法。
如果SparkSession已经存在它将直接返回,否则将创建一个新的SparkSession:
val spark = SparkSession.builder()
.master("local[1]")
.appName("SparkExamples")
.config("spark.sql.execution.arrow.enabled", "true")
.getOrCreate();
master:设置运行方式。
local
代表本机运行,local[4]
代表在本机分配4核运行。如果要在集群上运行,通常它可能是yarn
或mesos
, 这取决于你的集群配置;appName:设置spark应用程序的名字,可以在web UI界面看到;
config:额外配置项;
getOrCreate:如果已经存在,则返回SparkSession对象;如果不存在,则创建新对象。
SparkSession常用方法¶
version
:运行应用程序的Spark版本,或者是群集配置的Spark版本builder()
:用来创建SparkSession实例,它返回SparkSession.Builder
createDataFrame()
:将RDD
、Seq
、java.util.List
转换为DataFrame
createDataset()
:将RDD
、Seq
、java.util.List
转换为Dataset
emptyDataFrame()
:创建一个空的DataFrame
emptyDataset()
:创建一个空的Dataset
getActiveSession
:返回当前线程活跃的SparkSessionimplicits
:常用Scala对象转换为DataFrame的隐式转换read()
: 用于将csv、parquet、avro和其他文件格式的内容读取到DataFrame中, 它返回DataFrameReader
实例readStream()
:和read方法类似,不同的是它被用于读取流式数据sparkContext
:返回SparkContext
sql()
:执行SQL语句返回DataFrame数据sqlContext
:返回SQLContext
stop()
:停止当前SparkContexttable()
:返回DataFrame中的表或者视图udf()
:创建Spark UDF(User Defined Functions)
Spark — SparkContext¶
SparkContext在Spark 1.x版本引入,在Spark 2.0引入SparkSession之前,它是Spark应用程序的切入点。
SparkContext介绍¶
在Spark 1.x版本之后,SparkContext就成为Spark应用程序的切入点。
它定义在 org.apache.Spark
包中,用来在集群上创建RDD、累加器、广播变量。
每个JVM里只能存在一个处于active状态的SparkContext,在创建新的SparkContext之前必须调用 stop()
来关闭之前的SparkContext。
每一个Spark应用都是一个SparkContext实例,可以理解为一个SparkContext就是一个Spark应用的生命周期, 一旦SparkContext创建之后,就可以用这个SparkContext来创建RDD、累加器、广播变量。
spark-shell中使用¶
Spark Shell默认提供了一个名为 sc
的SparkContext类的实例对象,我们可以在spark-shell中直接使用此对象。
scala> val rdd = sc.textFile("D:\\Code\\Scala\\word.txt")
SparkContext创建(1.X)¶
在Scala中编写Spark应用程序需要先创建 SparkConf
实例,然后将SparkConf对象作为参数传递给 SparkContext
构造函数来创建SparkContext。
val sparkConf = new SparkConf()
.setAppName("SparkExample")
.setMaster("local[1]")
val sc = new SparkContext(sparkConf)
也可以使用 getOrCreate()
方法创建SparkContext。此函数用于获取或实例化SparkContext,并将其注册为单例对象。
val sc = SparkContext.getOrCreate(sparkConf)
SparkContext创建(2.X之后)¶
Spark 2.0之后,我们主要使用SparkSession,SparkSession中包含了SparkContext中大多数的API,Spark session在内部创建Spark Context并作为变量SparkContext来使用。
val sparkContext = spark.sparkContext
SparkContext常用方法¶
*Accumulator()
:创建累加器变量。Spark内置了三种类型的Accumulator,分别是longAccumulator
用来累加整数型,doubleAccumulator
用来累加浮点型,collectionAccumulator
用来累加集合元素applicationId
:Spark应用的唯一标识appName
:创建SparkContext时设置的AppNamebroadcast()
:向集群广播一个只读变量,广播变量只会被发到各个节点一次emptyRDD()
:创建一个空RDDgetPersistentRDDs
:返回所有持久化的RDD(cache()
)getOrCreate()
:创建或返回SparkContexthadoopFile()
:根据Hadoop文件创建RDDmaster
:创建SparkContext时设置的masternewAPIHadoopFile()
:使用新的API InputFormat为Hadoop文件创建RDDsequenceFile()
:创建具有指定键和值类型的Hadoop SequenceFile的RDDsetLogLevel()
:设置日志级别textFile()
:从HDFS、本地或任何Hadoop文件系统读取文本文件,返回RDDunion()
:Union两个RDDwholeTextFiles()
:从HDFS、本地或任何Hadoop的文件系统读取文件夹中的文本文件,并返回Tuple2的RDD。元组的第一个元素为文件名,第二个元素为文件内容。
Spark RDD — 概述¶
RDD 简介¶
RDD是Spark的基础数据结构。 RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象, 它代表一个不可变的、只读的被分区的数据集。 RDD中的每个数据集被划分为多个逻辑分区,使它们可以在集群中不同节点上被计算。
RDD提供一种基于粗粒度转换(比如 map
, filter
以及 join
)的接口,这些接口会将具体的操作应用到多个数据集上。
这使得它们可以记录创建数据集的“血统”(lineage),而不需要储存真正的数据。 从而达到高效的容错性。
RDD 特点¶
内存计算
不可变
容错性
延迟/惰性计算(lazy evaluation)
分区
并行化
RDD 局限性¶
由于RDD只能通过粗粒度的转换被创建(或者被写), RDD非常适合将相同操作应用在整个数据集的所有的元素上的批处理应用。 RDD不太适合用于需要细粒度的更新共享状态的应用,比如web应用或者网络爬虫应用的存储系统。 对于这些应用,使用传统的纪录更新日志会更加高效。RDD的目标是为批量分析提供一个高效的编程模型。
RDD 创建¶
在开始之前,我们先使用 SparkSession
类中定义的builder方法初始化一个SparkSession。 初始化时需要提供参数 master
和 appName
:
val spark:SparkSession = SparkSession.builder()
.master("local[*]")
.appName("SparkExample")
.getOrCreate()
RDD主要有两种创建方式:
基于parallelize创建:
val dataSeq = Seq(1,2,3,4,5,6,7,8,9,0)
val rdd=spark.sparkContext.parallelize(dataSeq)
基于外部数据源创建(
HDFS
、S3
等):
val rdd2 = spark.sparkContext.textFile("/path/textFile.txt")
创建RDD¶
RDD(Resilient Distributed Dataset)弹性分布式数据集是Spark中最重要的概念之一。它是一个只读的数据集合,之所以称它为弹性的,是因为它们能够在节点发生故障时根据血缘关系重新计算。
通常有下面三种创建RDD的方式:
Parallelize集合创建:接收一个已存在的Scala集合,然后进行各种并行计算。
加载外部数据源创建:外部数据源可以是本地文件或Hadoop文件系统的HDFS等。
由RDD、DataFrame和DataSet转换创建。
Parallelize集合创建¶
可以将已存在的数据集合传递给SparkContext的 parallelize
方法创建RDD,详见Spark Parallelize 方法。
示例:
val rdd = sc.parallelize(Array(1,2,3,4,5))
指定分片数:
val rdd = sc.parallelize(List(1,2,3,4,5),3) // 指定分片数 3
加载外部数据源创建¶
Spark可以从所有Hadoop支持的存储源创建RDD,包括本地文件系统、HDFS、Cassandra、HBase、Amazon S3等等。它支持文本格式、SequenceFile和其他所有Hadoop InputFormat。
使用SparkContext的 textFile()
方法创建文本文件RDD。此方法接收文件URI(本地文件路径或 hdfs://
, s3a://
等URI)作为参数,按行读取创建RDD。下面是一个调用示例:
val rdd = sc.textFile("/path/textFile.txt") // 文件路径
关于 textFile()
的更多用法,详见 Spark RDD - textFile。
由RDD转换创建¶
可以使用转换算子如 map
、flatmap
、filter
等从现有RDD创建新RDD。
假设这个场景, 从日志文件中找出含有 ERROR
字符串的行。我们先使用 textFile
加载日志文件创建RDD。然后再使用 filter
方法过滤出目标行,返回一个新的RDD,代码如下:
val logRDD = sc.textFile("/path/log.txt")
val errorRDD = logRDD.filter(_.contains("ERROR"))
由DataFrame和DataSet转换创建¶
DataSet和DataFrame的 rdd
属性将返回当前数据集的RDD形式。
val rdd = spark.range(10).toDF().rdd
Spark Parallelize 方法¶
Parallelize是一种在现有集合(例如数组)上创建RDD的方法。集合中的元素将被复制成为一个分布式数据集,然后我们可以在该数据集上进行并行操作。
Parallelize是Spark中创建RDD的三种方式之一,另外两种方法是:
从外部数据源创建,如本地文件系统、HDFS等等。
从现有RDD、DataSet、DataFrame转换而来。
用法:
sc.parallelize(seq: Seq[T], numSlices: Int)
sc
: SparkContext 对象seq
:集合对象numSlices
:可选参数,创建数据集的分区数。
numSlices
是parallelize的一个重要参数。Spark将为集群的每个分区运行一个任务。默认情况下,
Spark会根据集群情况自动设置分区数。但是,你也可以手动设置它(例如 sc.parallelize(data,10)
)。
示例¶
完整代码在这里查看。
package com.sparkexamples.spark.rdd
import org.apache.spark.sql.SparkSession
object CreateRDDWithParallelize {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[1]")
.appName("SparkExample")
.getOrCreate()
val sc = spark.sparkContext
val rdd1 = sc.parallelize(Array(1,2,3,4,5), 3)
println("elements of rdd1:" + rdd1.collect().mkString(","))
println("number of partitions:" + rdd1.getNumPartitions)
val rdd2 = sc.parallelize(List(6,7,8,9,10))
println("elements of rdd2:" + rdd2.collect().mkString(","))
val rdd3 = rdd1.union(rdd2)
println("elements of rdd3:" + rdd3.collect().mkString(","))
}
}
运行上面代码输出:
elements of rdd1:1,2,3,4,5
number of partitions:3
elements of rdd2:6,7,8,9,10
elements of rdd3:1,2,3,4,5,6,7,8,9,10
SparkContext.textFile读取文件¶
SparkContext提供了 textFile()
方法用于按行读取文本文件,返回RDD。
用法:
sc.textFile(path: String, minPartitions: Int)
sc
: SparkContext 对象path
:本地文件路径或hdfs://
,s3a://
等Hadoop支持的文件系统URIminPartitions
:可选参数,指定数据的最小分区
默认情况下,Spark为文件的每个块创建一个分区(HDFS中一个块默认为128MB),可以通过 minPartitions
参数来设置更多分区。请注意,分区数不能少于块数。
读取多个文件¶
如果要读取的多个指定文件,使用逗号分隔文件名传入 textFile()
即可。
val rdd = sc.textFile("/path/test01.txt,/path/test02.txt")
使用通配符读取多个文件¶
文件路径中可以使用通配符来读取多个文件。
val rdd1 = sc.textFile("/path/*.txt") // 读取路径所有txt文件
val rdd2 = sc.textFile("/path/*/*.txt") // 读取多个目录下的文件
从HDFS读取文件¶
从HDFS中读取文件和读取本地文件一样,只是要在URI中表明是HDFS。上面的所有读取方式也都适用于HDFS。
val rdd = sc.textFile("hdfs://master:9000/examples/data.txt")
完整代码¶
package com.sparkexamples.spark.rdd
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object ReadTextFileExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[1]")
.appName("SparkExample")
.getOrCreate()
val sc = spark.sparkContext
// 读取单个文件
val rdd1:RDD[String] = sc.textFile("/path/test01.txt")
rdd1.foreach(println)
// 读取多个文件
val rdd2:RDD[String] = sc.textFile("/path/test01.txt,/path/test02.txt")
rdd2.foreach(println)
// 读取路径下所有文件
val rdd3:RDD[String] = sc.textFile("/path/resources/")
rdd3.foreach(println)
// 通配符示例-读取路径所有txt文件
val rdd4:RDD[String] = sc.textFile("/path/*.txt")
rdd4.foreach(println)
// 通配符示例-读取多个目录下的文件
val rdd5:RDD[String] = sc.textFile("/path/*/*.txt")
rdd5.foreach(println)
// 读取HDFS文件
val rdd6:RDD[String] = sc.textFile("hdfs://master:9000/examples/data.txt")
rdd6.collect.foreach(println)
}
}
在线查看。
Spark RDD 转换算子(Transformations)¶
我们将RDD数据的转换操作称为Transformations ,RDD的所有转换操作都不会被计算,它仅记录作用于RDD上的操作,只有在遇到动作算子(Action)时才会进行计算。
下面是一些常见的转换算子:
转换 |
说明 |
---|---|
|
返回一个新的RDD,该RDD由每一个元素由 func 函数转换后组成。 |
|
返回一个新的RDD,该RDD由经过 func 函数计算后返回值为true的元素组成。 |
|
类似于 |
|
类似于 |
|
类似于 |
|
数据抽样算子。
|
|
将两个RDD中的元素进行合并取并集,类型要求一致,返回一个新的RDD。 |
|
将两个RDD中的元素进行合并取交集,类型要求一样,返回一个新的RDD。 |
|
将当前RDD进行去重后,返回一个新的RDD。 |
|
作用于Key-Value形式的RDD,将相同Key的值进行聚集,返回一个 (K, Iterable[V]) 类型的RDD。 |
|
作用于Key-Value形式的RDD,将相同Key的值使用 func 进行reduce聚合,返回新的RDD。 |
|
作用于Key-Value形式的RDD,根据Key进行排序。 |
|
作用于Key-Value形式的RDD,将相同Key的数据join在一起。 |
|
作用于Key-Value形式的RDD,将相同key的数据分别聚合成一个集合。 |
|
返回两个RDD的笛卡尔积RDD。 |
|
执行一个外部脚本,返回输出的RDD。 |
|
缩减分区数,用于大数据集过滤后提高小数据集的执行效率。 |
|
根据传入的分区数重新分区。 |
|
重新分区+排序,这比先分区再排序效率高。 |
示例¶
map¶
下面是一个简单的 map
示例:
val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))
println(rdd.map(x => 10 * x).collect.mkString(","))
上面代码将输出:
10,20,30,40,50,60,70,80,90,100
filter¶
比如分别过滤出1到10中的奇数和偶数,代码如下:
val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))
println(rdd.filter(x => x % 2 == 0).collect.mkString(","))
println(rdd.filter(x => x % 2 != 0).collect.mkString(","))
上面代码将输出:
2,4,6,8,10
1,3,5,7,9
flatMap¶
flatMap
的功能是将RDD中的每个元素进行拆分。 比如,假设我们读取了文本中的每一行,现在将其按空格拆分成单个词,代码如下:
val fileRDD = sc.parallelize(List("this is the fist line", "this is the second line"))
println(fileRDD.flatMap(x => x.split(" ")).collect.toSeq)
上面代码将输出:
WrappedArray(this, is, the, fist, line, this, is, the, second, line)
mapPartitions¶
相对于 map
将 func 逐条应用于每条数据,mapPartitions
是在每个分区上一次性将所有数据应于函数 func 。 这样某些情况下具有更高的效率,示例:
val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))
println(rdd.mapPartitions(_.map(x => 10 * x)).collect.toSeq)
上面代码将输出:
WrappedArray(10, 20, 30, 40, 50, 60, 70, 80, 90, 100)
Note
虽然 mapPartitions
的效率优于 map
,但是在内存有限的情况下可能会内存溢出。
mapPartitionsWithIndex¶
类似 mapPartitions
, 区别是 mapPartitionsWithIndex
的 func 函数多接受一个回调参数:当前分片的索引号。代码示例:
val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10), numSlices = 4)
println(rdd.mapPartitionsWithIndex((index,items)=>(items.map(x=>(index,x)))).collect().mkString(" - "))
上面代码将输出:
(0,1) - (0,2) - (1,3) - (1,4) - (1,5) - (2,6) - (2,7) - (3,8) - (3,9) - (3,10)
sample¶
根据传入按比例进行有放回或者不放回的抽样。代码示例:
val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))
println(rdd.sample(withReplacement = false, 0.3).collect.mkString(" "))
上面代码将输出:
1 6 10
union¶
将两个RDD中的元素进行合并取并集,类型要求一致,返回一个新的RDD。代码示例:
val rdd1 = sc.parallelize(1 to 5)
val rdd2= sc.parallelize(6 to 10)
println(rdd1.union(rdd2).collect.mkString(" "))
上面代码将输出:
1 2 3 4 5 6 7 8 9 10
intersection¶
将两个RDD中的元素进行合并取交集,类型要求一样,返回一个新的RDD。代码示例:
val rdd1 = sc.parallelize(1 to 5)
val rdd2= sc.parallelize(3 to 8)
println(rdd1.intersection(rdd2).collect.mkString(" "))
上面代码将输出:
4 3 5
distinct¶
对RDD里的元素进行去重操作。代码示例:
val rdd = sc.parallelize(Seq(1,1,2,2,3,4))
println(rdd.distinct.collect.mkString(" "))
上面代码将输出:
4 1 3 2
groupByKey¶
groupByKey
用于将 RDD[K,V] 中相同 K 的值合并到一个集合 Iterable[V] 中。代码示例:
val rdd = sc.parallelize(Seq(("a", 1), ("b", 2), ("b", 3), ("c", 4), ("c", 5), ("d", 6)))
rdd.groupByKey.collect.foreach(println)
上面代码将输出:
(d,CompactBuffer(6))
(a,CompactBuffer(1))
(b,CompactBuffer(2, 3))
(c,CompactBuffer(4, 5))
reduceByKey¶
reduceByKey
在 RDD[K,V] 上调用,返回一个 RDD[K,V] ,使用指定的reduce函数,将相同key的值聚合到一起。代码示例:
val rdd = sc.parallelize(Seq(("a", 1), ("b", 2), ("b", 3), ("c", 4), ("c", 5), ("d", 6)))
rdd.reduceByKey(_+_).collect.foreach(println)
上面代码将输出:
(d,6)
(a,1)
(b,5)
(c,9)
sortByKey¶
sortByKey
在 RDD[K,V] 上调用,返回一个 RDD[K,V] ,根据key进行排序,默认为 ascending: Boolean = true
(”升序“)。代码示例:
val rdd = sc.parallelize(Seq(("a", 1), ("c", 2), ("b", 3), ("f", 4), ("e", 5), ("d", 6)))
rdd.sortByKey().collect.foreach(println)
上面代码将输出:
(a,1)
(b,3)
(c,2)
(d,6)
(e,5)
(f,4)
join¶
join
、 fullOuterJoin
、 leftOuterJoin
、 rightOuterJoin
都是针对 RDD[K,V] 中 K 值相同的连接操作,
分别对应内连接、全连接、左连接、右连接。代码示例:
val rdd1 = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3), ("d", 4)))
val rdd2 = sc.parallelize(Seq(("b", 5), ("c", 6), ("d", 7), ("e", 8)))
rdd1.join(rdd2).collect.foreach(println)
上面代码将输出:
(d,(4,7))
(b,(2,5))
(c,(3,6))
cogroup¶
代码示例:
val rdd1 = sc.parallelize(1 to 3)
val rdd2 = sc.parallelize(4 to 6)
rdd1.cartesian(rdd2).collect.foreach(println)
上面代码将输出:
(1,4)
(1,5)
(1,6)
(2,4)
(2,5)
(2,6)
(3,4)
(3,5)
(3,6)
coalesce¶
coalesce
用来缩减分区,第二个可选参数 shuffle 是减少分区的过程中是否shuffle; true 为是, false 为否。默认为 false 。 代码示例:
val rdd1 = sc.parallelize(1 to 10, 4)
println("rdd1 NumPartitions: " + rdd1.getNumPartitions)
val rdd2 = rdd1.coalesce(2)
println("rdd2 NumPartitions: " + rdd2.getNumPartitions)
上面代码将输出:
rdd1 NumPartitions: 4
rdd2 NumPartitions: 2
repartition¶
代码示例:
val rdd1 = sc.parallelize(1 to 10, 2)
println("rdd1 NumPartitions: " + rdd1.getNumPartitions)
val rdd2 = rdd1.repartition(4)
println("rdd2 NumPartitions: " + rdd2.getNumPartitions)
上面代码将输出:
rdd12 NumPartitions: 2
rdd13 NumPartitions: 4
完整代码¶
package com.sparkexamples.spark.rdd
import org.apache.spark.sql.SparkSession
object RDDTransformationExamples {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[1]")
.appName("SparkExample")
.getOrCreate()
val sc = spark.sparkContext
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))
println("**** map example: ****")
println(rdd1.map(x => 10 * x).collect.mkString(","))
println("**** filter example: ****")
println(rdd1.filter(x => x % 2 == 0).collect.mkString(","))
println(rdd1.filter(x => x % 2 != 0).collect.mkString(","))
println("**** flatMap example: ****")
val fileRDD = sc.parallelize(List("this is the fist line", "this is the second line"))
println(fileRDD.flatMap(x => x.split(" ")).collect.toSeq)
println("**** mapPartitions example: ****")
println(rdd1.mapPartitions(_.map(x => 10 * x)).collect.toSeq)
println("**** mapPartitionsWithIndex example: ****")
val rdd2 = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10), numSlices = 4)
println(rdd2.mapPartitionsWithIndex((index,items)=>(items.map(x=>(index,x)))).collect.mkString(" - "))
println("**** sample example: ****")
println(rdd1.sample(withReplacement = false, 0.3).collect.mkString(" "))
println("**** union example: ****")
println(rdd1.union(rdd2).collect.mkString(" "))
println("**** intersection example: ****")
println(rdd1.intersection(rdd2).collect.mkString(" "))
println("**** distinct example: ****")
val rdd3 = sc.parallelize(Seq(1,1,2,2,3,4))
println(rdd3.distinct.collect.mkString(" "))
val rdd4 = sc.parallelize(Seq(("a", 1), ("b", 2), ("b", 3), ("c", 4), ("c", 5), ("d", 6)))
println("**** groupByKey example: ****")
rdd4.groupByKey.collect.foreach(println)
println("**** reduceByKey example: ****")
rdd4.reduceByKey(_+_).collect.foreach(println)
val rdd5 = sc.parallelize(Seq(("a", 1), ("c", 2), ("b", 3), ("f", 4), ("e", 5), ("d", 6)))
println("**** sortByKey example: ****")
rdd5.sortByKey().collect.foreach(println)
val rdd6 = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3), ("d", 4)))
val rdd7 = sc.parallelize(Seq(("b", 5), ("c", 6), ("d", 7), ("e", 8)))
println("**** join example: ****")
rdd6.join(rdd7).collect.foreach(println)
println("**** cogroup example: ****")
rdd6.cogroup(rdd7).collect.foreach(println)
val rdd8 = sc.parallelize(1 to 3)
val rdd9 = sc.parallelize(4 to 6)
println("**** cartesian example: ****")
rdd8.cartesian(rdd9).collect.foreach(println)
println("**** coalesce example: ****")
val rdd10 = sc.parallelize(1 to 10, 4)
println("rdd10 NumPartitions: " + rdd10.getNumPartitions)
val rdd11 = rdd10.coalesce(2)
println("rdd11 NumPartitions: " + rdd11.getNumPartitions)
println("**** repartition example: ****")
val rdd12 = sc.parallelize(1 to 10, 2)
println("rdd12 NumPartitions: " + rdd12.getNumPartitions)
val rdd13 = rdd12.repartition(4)
println("rdd13 NumPartitions: " + rdd13.getNumPartitions)
}
}
在线查看。
Spark RDD 行动算子(Actions)¶
正如转换算子中提到的,所有的转换算子都是惰性的,这意味着它们不会立即执行,只有在遇到行动算子时才会触发执行。
Spark中的行动算子分为两类:一类算子执行转换算子生成Scala集合或变量;另一类是将RDD保存到外部文件系统或者数据库中。
下面是一些常见的行动算子:
Action |
说明 |
---|---|
|
使用函数 func 聚合RDD中的所有元素。该函数应该是可交换和关联的。 |
|
以数组形式返回所有的数据。 |
|
返回 RDD 中元素个数。 |
|
返回 RDD 中第一个元素。 |
|
返回 RDD 中前 n 个元素组成的数据。 |
|
返回一个数组,该数组由从 RDD 中随机采样的 num 个元素组成返回。 |
|
按 RDD 的自然顺序或自定义比较器返回 RDD 的前 n 个元素。 |
|
将 RDD 中的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用 |
|
将 RDD 中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以是HDFS或者其他Hadoop支持的文件系统。 |
|
应用于 (K,V) 类型的 RDD ,返回一个 (K,Int) 的map,表示每一个key对应的元素个数。 |
|
在 RDD 中的每一个元素上运行函数 func 。 |
示例¶
reduce¶
reduce
常用来整合RDD中所有数据,例如求和操作。 示例:
val rdd = sc.parallelize(1 to 10)
println(rdd.reduce((x, y) => x + y))
// 输出: 55
上面的函数部分也可简写为:
rdd.reduce(_+_)
collect¶
示例:
val rdd = sc.parallelize(1 to 10)
println(rdd.collect().mkString(" "))
// 输出: 1 2 3 4 5 6 7 8 9 10
Note
collect
应该在 filter
或其他操作后返回一个足够小的数据集时使用。 如果直接将整个数据集 collect
返回,这可能会使driver程序OOM。
count¶
示例:
val rdd = sc.parallelize(1 to 10)
println("elements number:" + rdd.count())
// 输出: elements number:10
first¶
示例:
val rdd = sc.parallelize(1 to 10)
println("the first element:" + rdd.first())
// 输出: the first element:1
take¶
示例:
val rdd = sc.parallelize(1 to 10)
println("the first two element:" + rdd.take(2).mkString(" "))
// 输出: the first two element:1 2
Note
仅当预期生成的数组很小时才应使用此方法,因为所有数据都将加载到driver内存中。
takeSample¶
同 take
方法,该方法仅在预期结果数组很小的情况下使用,因为所有数据都被加载到driver内存中。
语法:
takeSample(withReplacement: Boolean, num: Int, seed: Long)
withReplacement:元素是否可以多次抽样;
num:返回样本大小;
seed:随机数生成器的种子;
示例:
val rdd = sc.parallelize(1 to 10)
println("take two sample element:" + rdd1.takeSample(withReplacement = true,2).mkString(" "))
// 输出: the first two element:3 10
takeOrdered¶
同 take
方法,该方法仅在预期结果数组很小的情况下使用,因为所有数据都被加载到driver内存中。
示例:
val rdd = sc.parallelize(1 to 10)
println("take two element with order:" + rdd.takeOrdered(2).mkString(" "))
// 输出: take two element with order:1 2
countByKey¶
示例:
val rdd = sc.parallelize(Seq("a", "b", "b", "c", "c", "c", "d"))
println(rdd.map(x => (x, 1)).countByKey())
// 输出: Map(d -> 1, a -> 1, b -> 2, c -> 3)
完整代码¶
package com.sparkexamples.spark.rdd
import org.apache.spark.sql.SparkSession
object RDDActionExamples {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[1]")
.appName("SparkExample")
.getOrCreate()
val sc = spark.sparkContext
val rdd1 = sc.parallelize(1 to 10)
println("**** reduce example: ****")
println(rdd1.reduce((x, y) => x + y))
println("**** collect example: ****")
println(rdd1.collect().mkString(" "))
println("**** count example: ****")
println("elements number:" + rdd1.count())
println("**** first example: ****")
println("the first element:" + rdd1.first())
println("**** take example: ****")
println("the first two element:" + rdd1.take(2).mkString(" "))
println("**** takeSample example: ****")
println("take two sample element:" + rdd1.takeSample(withReplacement = true,2).mkString(" "))
println("**** takeOrdered example: ****")
println("take two element with order:" + rdd1.takeOrdered(2).mkString(" "))
println("**** saveAsTextFile example: ****")
rdd1.saveAsTextFile("path/data")
println("**** saveAsTextFile example: ****")
val rdd2 = sc.parallelize(Seq("a", "b", "b", "c", "c", "c", "d"))
println(rdd2.map(x => (x, 1)).countByKey())
}
}
在线查看。
Spark RDD — Stage¶
Spark Stage是计算多个任务的物理执行单元。 Spark处理数据时会根据RDD的依赖关系构建一个有向无环图(DAG),然后根据DAG划分Stage, 在运行时按照步骤执行DAG中的Stage。
DAG¶
有向无环图:DAG(Directed Acyclic Graph)是什么?:
Directed:节点间由边连接在一起,连接顺序由RDD上的调用顺序决定。
Acyclic:节点不能连接成环;即如果操作或转换一旦完成,则无法还原回其原始值。
Graph:Graph指由边和点排列形成的特定图形。点为RDD,边为RDD上调用的算子。
DAGScheduler将Stage划分为多个任务,然后将Stage信息传递给集群管理器,集群管理器触发任务调度器运行任务。 Spark驱动程序将逻辑执行计划转换为物理执行计划。Spark作业以流水线方法执行。
WordCount例子¶
下面是一个Spark入门级的程序WordCount:
val sc = spark.sparkContext
sc.textFile("src/main/resources/test.txt")
.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_+_)
.saveAsTextFile("src/main/resources/result")
程序解释:
textFile(...)
: 读取本地文本文件 test.txt ;flatMap(...)
: 将文件中的内容拆分成每个词 ;map(...)
: 将每个词处理成 (word, 1) ,方便后续计数统计 ;reduceByKey(...)
: 按 key 将 value 累加得到每个 key 的数量 ;saveAsTextFile(...)
: 将计算结果写入到本地目的 result ;
Spark应用程序其实就是基于RDD的一系列计算操作,可以在SparkUI上看到如下DAG图:

WordCount DAG¶
从图中可以出,WordCount程序中的5个操作算子被分成了两个Stage。
RDD依赖关系¶
Spark中每个转换算子都会生成一个新的RDD,这样在一系列RDD中就形成了一种前后依赖关系。 RDD中的依赖关系分为窄依赖(Narrow Dependency)与宽依赖(Wide Dependency),下图展示了两种依赖之间的区别:

宽依赖和窄依赖¶
窄依赖:父RDD的分区最多被子RDD的一个分区使用。从子RDD角度看,它精确知道依赖的上级RDD;
宽依赖:父RDD的一个分区会被子RDD的多个分区使用。从子RDD角度看,每个分区都要依赖于父RDD的所有分区;
总体而言,如果父RDD的一个分区只被一个子RDD的一个分区所使用就是窄依赖, 否则就是宽依赖。
窄依赖典型的算子包括 map
、 filter
、 union
等,宽依赖典型的算子包括 groupByKey
、 sortByKey
等。对于连接( join
)操作,分为两种情况:
对输入进行协同划分(co-partitioned),属于窄依赖(如上图左所示侧)。所谓协同划分是指多个父RDD的每个分区的所有”键(key)“,落在子RDD的同一个分区上,不会产生同一个父RDD的某一分区,落在子RDD的两个分区上的情况。
对输入非协同划分(not co-partitioned),属于宽依赖,如上图右侧所示。
划为宽依赖和窄依赖有以下两个好处:
第一,窄依赖可以在单个集群节点中流水线式执行。比如,我们可以将元素应用了
map
操作后紧接着应用filter
操作。与此相反,宽依赖需要父亲RDD的所有分区数据都准备好,并且通过类似于MapReduce的操作将数据在不同的节点之间进行重新洗牌(Shuffle)和网络传输。第二,窄依赖从失败节点中恢复是非常高效的,因为只需要重新计算相对应的父分区数据就可以,而且这个重新计算是在不同的节点并行重计算的,与此相反,在一个含有宽依赖的DAG中,某个节点失败导致一些分区数据的丢失,但是我们需要重新计算父RDD的所有分区的数据。
Stage划分¶
Spark通过分析各个RDD的依赖关系生成了DAG,再通过分析各个RDD的依赖关系来决定如何划分Stage, 具体划分方法是:从DAG图末端出发,逆向遍历整个依赖链,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到当前的Stage中,Stage中task数目由Stage末端的RDD分区个数来决定;

Stage划分¶
将窄依赖尽量划分在同一个Stage中,以实现流水线式计算。例如在上面的WordCount例子中,textFile、flatMap 和 map 被划分为Stage0,由于map到reduceByKey为宽依赖,所以reduceByKey和saveAsTextFile被划分为Stage1。
Spark RDD 持久化¶
Spark可以将RDD持久化到内存或磁盘文件系统中,把RDD持久化到内存中可以极大地提高迭代计算以及各计算模型之间的数据共享。
Spark的开发调优有一个原则,即对多次使用的RDD进行持久化(或称缓存)。如果要对一个RDD进行持久化,只要对这个RDD调用 cache
或 persist
即可。
cache方法¶
语法:
cache(): this.type = persist()
cache()
方法使用非序列化的方式直接将RDD的数据全部尝试持久化到内存中, cache
是 persist
的一个特例。
persist方法¶
语法:
persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
persist(newLevel: StorageLevel): this.type
persist()
方法可以手动传入一个持久化级别进行持久化,无参调用时等同于 cache()
。
持久化级别¶
persist()
方法可以传入一个 StorageLevel(持久化级别),当 StorageLevel 为 MEMORY_ONLY 时就是 cache()
。
所有支持的 StorageLevel 列表定义在 StorageLevel 的伴生对象中,如下:
/**
* Various StorageLevel defined and utility functions for creating new storage levels.
*/
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
}
StorageLevel
类的五个初始化参数为:
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
因此可以将上面的持久化级别整理为:
StorageLevel |
磁盘 |
内存 |
OffHeap |
反序列化 |
副本 |
---|---|---|---|---|---|
|
否 |
否 |
否 |
否 |
1 |
|
是 |
否 |
否 |
否 |
2 |
|
是 |
否 |
否 |
否 |
1 |
|
否 |
是 |
否 |
是 |
1 |
|
否 |
是 |
否 |
是 |
2 |
|
否 |
是 |
否 |
否 |
1 |
|
否 |
是 |
否 |
否 |
2 |
|
是 |
是 |
否 |
是 |
1 |
|
是 |
是 |
否 |
是 |
2 |
|
是 |
是 |
否 |
否 |
1 |
|
是 |
是 |
否 |
否 |
1 |
|
是 |
是 |
是 |
否 |
1 |
例如:
MEMORY_ONLY:将RDD以未序列化的Java对象格式储存在内存中。如果内存不够存放所有的数据,则某些分区将不会被缓存。
MEMORY_AND_DISK:将RDD以未序列化的Java对象格式存储在内存中。如果内存不够存放所有的数据,则将剩余存储在磁盘中。
MEMORY_ONLY_SER / MEMORY_AND_DISK_SER:基本含义同 MEMORY_ONLY / MEMORY_AND_DISK 。唯一的区别是,它会将RDD中的数据进行序列化,这样更加节省内存,从而可以避免持久化的数据占用过多内存导致GC。
DISK_ONLY:将RDD以未序列化的Java对象格式写入磁盘文件中。
例子¶
这里以搜狗日志数据为例,计算用户一天的搜索条数,数据从搜狗实验室(http://www.sogou.com/labs/) 下载 。
将下载后的数据包解压上传至HDFS:
hdfs dfs -put SogouQ.reduced /data
然后在spark-shell中执行:
scala> val sc = spark.sparkContext
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@70cc0601
scala> val fileRdd = sc.textFile("hdfs://127.0.0.1:9000/data/SogouQ.reduced", minPartitions = 1)
fileRdd: org.apache.spark.rdd.RDD[String] = hdfs://127.0.0.1:9000/data/SogouQ.reduced MapPartitionsRDD[1] at textFile at <console>:25
scala> fileRdd.cache()
res0: fileRdd.type = hdfs://127.0.0.1:9000/data/SogouQ.reduced MapPartitionsRDD[1] at textFile at <console>:25
scala> fileRdd.count()
res1: Long = 1724264
scala> fileRdd.count()
res2: Long = 1724264
在代码中,第一次 count
时会从HDFS中读取数据,然后将这些数据缓存在内存中;第二次 count
时就直接从内存中读取数据。
可以在UI界面看到,第一次 count
耗时2s,第二次 count
只耗时62ms。

Spark — 共享变量¶
概述¶
在默认情况下,Spark在集群中多个节点运行一个函数时,它会把函数中涉及到的每个变量先创建一份副本,在副本上进行运算。但是有时候需要在多个任务之间共享变量, 或者在任务(Task)和Driver程序之间共享变量。为了满足这种需求,Spark提供了两种类型的变量:广播变量(broadcast variables)和累加器(accumulators)。
广播变量用来把变量在所有节点的内存之间进行共享。累加器则支持在不同节点之间进行累加计算(比如计数或者求和)。
累加器¶
累加器是仅仅被相关操作累加的变量,通常用来实现计数器(counter)和求和(sum)。如前面提到在各任务中同一函数中的变量是不共享的,在不使用累加器的情况下, 程序可能不会返回我们期望的值,例如:
var counter = 0
val rdd = sc.parallelize(Seq(1,2,3,4,5))
rdd.foreach(x => counter += x)
println(counter)
// 输出 0
上面代码并没有输出我们期望的 15
,变量 counter
定义在Driver程序中,在执行时被累加的其实 counter
的副本,最后累加结果也不会更新到Driver程序的 counter
上,所以最后输出的值仍还是 0
。
下面我们使用累加器来稍微修改上面的程序:
val counter = sc.longAccumulator("counter")
val rdd = sc.parallelize(Seq(1,2,3,4,5))
rdd.foreach(x => counter.add(x))
println(counter.value)
// 输出 15
这样使用累加器 longAccumulator
程序便返回了我们期望的值。
SparkContext中提供了三种类型的累加器: longAccumulator
、 doubleAccumulator
、 collectionAccumulator
,在创建时可以传入一个 name
参数,这样方便在Spark UI界面中查看。
累加器也是惰性计算的,只有在RDD进行Action操作时才会被更新,在遇到如Map等惰性操作时不会被立即计算,例如:
val counter = sc.longAccumulator("counter")
val rdd = sc.parallelize(Seq(1,2,3,4,5))
rdd.map{x => counter.add(x); x}
println(counter.value)
// 输出 0
广播变量¶
What is Apache Spark?¶
Apache Spark is an Open source analytical processing engine for large scale powerful distributed data processing and machine learning applications. Spark is Originally developed at the University of California, Berkeley’s, and later donated to Apache Software Foundation. In February 2014, Spark became a Top-Level Apache Project and has been contributed by thousands of engineers and made Spark as one of the most active open-source projects in Apache.
Apache Spark Features¶
In-memory computation
Distributed processing using parallelize
Can be used with many cluster managers (Spark, Yarn, Mesos e.t.c)
Fault-tolerant
Immutable
Lazy evaluation
Cache & persistence
Inbuild-optimization when using DataFrames
Supports ANSI SQL