_images/apachespark.svg

Spark By Examples | Learn Spark Tutorial with Examples.

Spark By Examples

Spark — Windows环境安装

安装Java

Windows上安装Apache Spark需要Java 8或更高版本,你可以从 Oracle 下载相应的Java版本然后安装。 使用OpenJDK可以从这里下载。

下载安装后,请参考Java安装教程配置好环境变量。

Note

本文使用的环境是Java8,同样的步骤也适用于Java11和13版本。

安装Spark

访问Spark下载页下载Apache Spark压缩包。 可以使用下拉框选择不同版本的Spark和Hadoop,然后点击第三行的下载链接即可下载。

Spark Download Page

ApacheSpark下载页

下载完成后使用解压工具(推荐7zip)将其解压,然后将解压后的目录 spark-3.0.3-bin-hadoop2.7 复制到 C:\opt\spark-3.0.3-bin-hadoop2.7

配置环境变量

Windows上安装Apache Spark需要配置 Java_HOME(安装Java时配置)、 Spark_HOMEHADOOP_HOMEPATH 环境变量。 根据安装目录需要做如下配置:

SPARK_HOME = C:\opt\spark-3.0.3-bin-hadoop2.7
HADOOP_HOME = C:\opt\spark-3.0.3-bin-hadoop2.7
PATH=%PATH%;%SPARK_HOME%

新建环境变量 SPARK_HOME:

Environment Variables

新建环境变量 HADOOP_HOME:

Environment Variables

%SPARK_HOME% 添加至环境变量 PATH 中:

Environment Variables

安装 winutils.exe

要在Windows上运行Apache Spark,还需要下载 winutils.exe,可以到这里自行下载对应Hadoop版本的winutils包,然后将其内容复制到 %SPARK_HOME%\bin 目录下。

Note

Hadoop的版本可以在 %SPARK_HOME% 目录中的 RELEASE 文件中看到。

Spark shell

Spark shell是Spark发行版中附带的CLI工具。在配置好环境变量后, 在命令行中运行命令 spark-shell 即可打开Spark shell交互式命令行工具。

Spark Shell Command Line

Spark Shell Command Line

默认情况下,spark-shell同时还会拉起一个Web页面,在本例中为http://host.docker.internal:4040 , 可以在浏览器中直接打开。实际地址请详见spark-shell的输出提示。

在spark-shell的命令行中可以运行一些spark语句,如创建RDD、获取spark版本等。

scala> spark.version
res0: String = 3.0.3

scala> val rdd = sc.parallelize(Array(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

Spark — Linux环境安装

要求

这里演示如何在Linux中安装Spark单机版。和在Windows一样,安装Spark需提前安装Java 8或更高版本,参考Spark — Windows环境安装

下载Spark

访问Spark下载页下载Apache Spark压缩包。 可以使用下拉框选择不同版本的Spark和Hadoop,然后点击第三行获取下载链接。

Spark Download Page

ApacheSpark下载页

在命令行使用 wget 命令下载:

wget https://downloads.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz

下载完成后解压至目录 /opt/spark:

tar -zxvf spark-3.0.3-bin-hadoop2.7.tgz 
mv spark-3.0.3-bin-hadoop2.7 /opt/spark

配置环境变量

配置Spark环境变量:

[root@bigdata-app]$ vim ~/.bashrc 
# 在文件中加入下面两行.
export SPARK_HOME=/opt/spark/spark-3.0.3-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME/bin

然后运行下面命令使环境变量生效:

source ~/.bashrc 

验证

至此Spark已完成在Linux机器上的安装,可以运行 spark-shell 验证是否安装完成,也可以使用 spark-submit 运行一个Spark例子:

spark-submit --class org.apache.spark.examples.SparkPi /opt/spark/spark-3.0.3-bin-hadoop2.7/examples/jars/spark-examples_2.12-3.0.3.jar 10

Spark — 在IntelliJ IDEA上创建项目

安装Scala插件

推荐使用Scala编写Spark应用,在IDEA使用Scala需要先安装Scala插件。 位置: File > Settings > Plugins

IDEA Scala Plugin

安装Scala插件

创建Scala项目

新建工程, 选择 Scala > sbt

Create Scala Project

设置项目名称和路径,选择Scala和SBT版本,点击Finish即完成创建:

Create Scala Project

添加Sbt依赖

编辑 build.sbt 文件,添加Spark依赖包:

name := "SparkExamples"

version := "0.1"

scalaVersion := "2.12.2"

libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.3"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.3"

创建WordCount例子

src/main/scala 下创建 WordCount.scala 文件,代码如下:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object WordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("WordCountExample")
      .setMaster("local[2]")
    val sc = new SparkContext(conf)
    val textFile = sc.textFile("D:\\Code\\Scala\\word.txt")
    val wordCount = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
    wordCount.foreach(println)
  }
}

右键 WordCount.scala 选择运行即可:

Run Word Count

Spark — SparkSession

Apache Spark 2.0引入了 SparkSession ,它为用户提供了一个统一的切入点来使用Spark的各项功能, 并且允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序。最重要的是,它减少了用户需要了解的一些概念,使得用户可以很容易地与Spark交互。

SparkSession介绍

Spark 2.0引入了一个新类 org.apache.Spark.sql.SparkSession ,它内部封装了我们在2.0之前版本中 使用的所有上下文的组合类( SQLContextHiveContext 等等),使用起来更加方便。

正如前面所讲,SparkSession是Spark程序的切入点,所有Spark程序都应以创建SparkSession实例开始。 使用 SparkSession.builder() 创建SparkSession。

SparkSession包括了以下上下文接口:

  • Spark Context

  • SQL Context

  • Streaming Context

  • Hive Context

spark-shell中使用

Spark Shell默认提供了一个名为 spark 的SparkSession类的实例对象,我们可以在spark-shell中直接使用此对象。

scala> val sqlContext = spark.sqlContext

SparkSession创建

在Scala中创建SparkSession,需要使用生成器方法 builder() 然后调用 getOrCreate()方法。 如果SparkSession已经存在它将直接返回,否则将创建一个新的SparkSession:

val spark = SparkSession.builder()
  .master("local[1]")
  .appName("SparkExamples")
  .config("spark.sql.execution.arrow.enabled", "true")
  .getOrCreate();
  • master:设置运行方式。 local 代表本机运行,local[4] 代表在本机分配4核运行。如果要在集群上运行,通常它可能是 yarnmesos, 这取决于你的集群配置;

  • appName:设置spark应用程序的名字,可以在web UI界面看到;

  • config:额外配置项;

  • getOrCreate:如果已经存在,则返回SparkSession对象;如果不存在,则创建新对象。

SparkSession常用方法

  • version:运行应用程序的Spark版本,或者是群集配置的Spark版本

  • builder():用来创建SparkSession实例,它返回 SparkSession.Builder

  • createDataFrame():将 RDDSeqjava.util.List 转换为 DataFrame

  • createDataset():将 RDDSeqjava.util.List 转换为 Dataset

  • emptyDataFrame():创建一个空的 DataFrame

  • emptyDataset():创建一个空的 Dataset

  • getActiveSession:返回当前线程活跃的SparkSession

  • implicits:常用Scala对象转换为DataFrame的隐式转换

  • read(): 用于将csv、parquet、avro和其他文件格式的内容读取到DataFrame中, 它返回 DataFrameReader 实例

  • readStream():和read方法类似,不同的是它被用于读取流式数据

  • sparkContext:返回 SparkContext

  • sql():执行SQL语句返回DataFrame数据

  • sqlContext:返回 SQLContext

  • stop():停止当前SparkContext

  • table():返回DataFrame中的表或者视图

  • udf():创建Spark UDF(User Defined Functions)

Spark — SparkContext

SparkContext在Spark 1.x版本引入,在Spark 2.0引入SparkSession之前,它是Spark应用程序的切入点。

SparkContext介绍

在Spark 1.x版本之后,SparkContext就成为Spark应用程序的切入点。 它定义在 org.apache.Spark 包中,用来在集群上创建RDD、累加器、广播变量。

每个JVM里只能存在一个处于active状态的SparkContext,在创建新的SparkContext之前必须调用 stop() 来关闭之前的SparkContext。

每一个Spark应用都是一个SparkContext实例,可以理解为一个SparkContext就是一个Spark应用的生命周期, 一旦SparkContext创建之后,就可以用这个SparkContext来创建RDD、累加器、广播变量。

spark-shell中使用

Spark Shell默认提供了一个名为 sc 的SparkContext类的实例对象,我们可以在spark-shell中直接使用此对象。

scala> val rdd = sc.textFile("D:\\Code\\Scala\\word.txt")

SparkContext创建(1.X)

在Scala中编写Spark应用程序需要先创建 SparkConf 实例,然后将SparkConf对象作为参数传递给 SparkContext 构造函数来创建SparkContext。

val sparkConf = new SparkConf()
  .setAppName("SparkExample")
  .setMaster("local[1]")
val sc = new SparkContext(sparkConf)

也可以使用 getOrCreate() 方法创建SparkContext。此函数用于获取或实例化SparkContext,并将其注册为单例对象。

val sc = SparkContext.getOrCreate(sparkConf)

SparkContext创建(2.X之后)

Spark 2.0之后,我们主要使用SparkSession,SparkSession中包含了SparkContext中大多数的API,Spark session在内部创建Spark Context并作为变量SparkContext来使用。

val sparkContext = spark.sparkContext

SparkContext常用方法

  • *Accumulator():创建累加器变量。Spark内置了三种类型的Accumulator,分别是 longAccumulator 用来累加整数型, doubleAccumulator 用来累加浮点型, collectionAccumulator 用来累加集合元素

  • applicationId:Spark应用的唯一标识

  • appName:创建SparkContext时设置的AppName

  • broadcast():向集群广播一个只读变量,广播变量只会被发到各个节点一次

  • emptyRDD():创建一个空RDD

  • getPersistentRDDs:返回所有持久化的RDD(cache())

  • getOrCreate():创建或返回SparkContext

  • hadoopFile():根据Hadoop文件创建RDD

  • master:创建SparkContext时设置的master

  • newAPIHadoopFile():使用新的API InputFormat为Hadoop文件创建RDD

  • sequenceFile():创建具有指定键和值类型的Hadoop SequenceFile的RDD

  • setLogLevel():设置日志级别

  • textFile():从HDFS、本地或任何Hadoop文件系统读取文本文件,返回RDD

  • union():Union两个RDD

  • wholeTextFiles():从HDFS、本地或任何Hadoop的文件系统读取文件夹中的文本文件,并返回Tuple2的RDD。元组的第一个元素为文件名,第二个元素为文件内容。

Spark RDD — 概述

RDD 简介

RDD是Spark的基础数据结构。 RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象, 它代表一个不可变的、只读的被分区的数据集。 RDD中的每个数据集被划分为多个逻辑分区,使它们可以在集群中不同节点上被计算。

RDD提供一种基于粗粒度转换(比如 mapfilter 以及 join )的接口,这些接口会将具体的操作应用到多个数据集上。 这使得它们可以记录创建数据集的“血统”(lineage),而不需要储存真正的数据。 从而达到高效的容错性。

RDD 特点

  • 内存计算

  • 不可变

  • 容错性

  • 延迟/惰性计算(lazy evaluation)

  • 分区

  • 并行化

RDD 局限性

由于RDD只能通过粗粒度的转换被创建(或者被写), RDD非常适合将相同操作应用在整个数据集的所有的元素上的批处理应用。 RDD不太适合用于需要细粒度的更新共享状态的应用,比如web应用或者网络爬虫应用的存储系统。 对于这些应用,使用传统的纪录更新日志会更加高效。RDD的目标是为批量分析提供一个高效的编程模型。

RDD 创建

在开始之前,我们先使用 SparkSession 类中定义的builder方法初始化一个SparkSession。 初始化时需要提供参数 masterappName

val spark:SparkSession = SparkSession.builder()
  .master("local[*]")
  .appName("SparkExample")
  .getOrCreate()  

RDD主要有两种创建方式:

  • 基于parallelize创建:

val dataSeq = Seq(1,2,3,4,5,6,7,8,9,0)
val rdd=spark.sparkContext.parallelize(dataSeq)
  • 基于外部数据源创建(HDFSS3 等):

val rdd2 = spark.sparkContext.textFile("/path/textFile.txt")

创建RDD

RDD(Resilient Distributed Dataset)弹性分布式数据集是Spark中最重要的概念之一。它是一个只读的数据集合,之所以称它为弹性的,是因为它们能够在节点发生故障时根据血缘关系重新计算。

通常有下面三种创建RDD的方式:

Parallelize集合创建

可以将已存在的数据集合传递给SparkContext的 parallelize 方法创建RDD,详见Spark Parallelize 方法

  • 示例:

val rdd = sc.parallelize(Array(1,2,3,4,5))
  • 指定分片数:

val rdd = sc.parallelize(List(1,2,3,4,5),3)  // 指定分片数 3

加载外部数据源创建

Spark可以从所有Hadoop支持的存储源创建RDD,包括本地文件系统、HDFS、Cassandra、HBase、Amazon S3等等。它支持文本格式、SequenceFile和其他所有Hadoop InputFormat。

使用SparkContext的 textFile() 方法创建文本文件RDD。此方法接收文件URI(本地文件路径或 hdfs://s3a:// 等URI)作为参数,按行读取创建RDD。下面是一个调用示例:

val rdd = sc.textFile("/path/textFile.txt")  // 文件路径

关于 textFile() 的更多用法,详见 Spark RDD - textFile

由RDD转换创建

可以使用转换算子如 mapflatmapfilter等从现有RDD创建新RDD。

假设这个场景, 从日志文件中找出含有 ERROR 字符串的行。我们先使用 textFile 加载日志文件创建RDD。然后再使用 filter 方法过滤出目标行,返回一个新的RDD,代码如下:

val logRDD = sc.textFile("/path/log.txt")
val errorRDD = logRDD.filter(_.contains("ERROR"))

由DataFrame和DataSet转换创建

DataSet和DataFrame的 rdd 属性将返回当前数据集的RDD形式。

val rdd = spark.range(10).toDF().rdd

Spark Parallelize 方法

Parallelize是一种在现有集合(例如数组)上创建RDD的方法。集合中的元素将被复制成为一个分布式数据集,然后我们可以在该数据集上进行并行操作。

Parallelize是Spark中创建RDD的三种方式之一,另外两种方法是:

  • 从外部数据源创建,如本地文件系统、HDFS等等。

  • 从现有RDD、DataSet、DataFrame转换而来。

用法:

sc.parallelize(seq: Seq[T], numSlices: Int)
  • sc: SparkContext 对象

  • seq:集合对象

  • numSlices:可选参数,创建数据集的分区数。

numSlices是parallelize的一个重要参数。Spark将为集群的每个分区运行一个任务。默认情况下, Spark会根据集群情况自动设置分区数。但是,你也可以手动设置它(例如 sc.parallelize(data,10))。

示例

完整代码在这里查看。

package com.sparkexamples.spark.rdd

import org.apache.spark.sql.SparkSession

object CreateRDDWithParallelize {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[1]")
      .appName("SparkExample")
      .getOrCreate()

    val sc = spark.sparkContext

    val rdd1 = sc.parallelize(Array(1,2,3,4,5), 3)
    println("elements of rdd1:" + rdd1.collect().mkString(","))
    println("number of partitions:" + rdd1.getNumPartitions)

    val rdd2 = sc.parallelize(List(6,7,8,9,10))
    println("elements of rdd2:" + rdd2.collect().mkString(","))

    val rdd3 = rdd1.union(rdd2)
    println("elements of rdd3:" + rdd3.collect().mkString(","))

  }

}

运行上面代码输出:

elements of rdd1:1,2,3,4,5
number of partitions:3
elements of rdd2:6,7,8,9,10
elements of rdd3:1,2,3,4,5,6,7,8,9,10

SparkContext.textFile读取文件

SparkContext提供了 textFile() 方法用于按行读取文本文件,返回RDD。

用法:

sc.textFile(path: String, minPartitions: Int)
  • sc: SparkContext 对象

  • path:本地文件路径或 hdfs://s3a:// 等Hadoop支持的文件系统URI

  • minPartitions:可选参数,指定数据的最小分区

默认情况下,Spark为文件的每个块创建一个分区(HDFS中一个块默认为128MB),可以通过 minPartitions 参数来设置更多分区。请注意,分区数不能少于块数。

读取单个文件

直接将文件路径作为 path 参数传入 textFile() 读取单个文件,返回每行内容的RDD。

val rdd = sc.textFile("/path/text.txt")

读取多个文件

如果要读取的多个指定文件,使用逗号分隔文件名传入 textFile() 即可。

val rdd = sc.textFile("/path/test01.txt,/path/test02.txt")

读取路径下所有文件

path 可传入文件路径来读取路径下所有文件。

val rdd = sc.textFile("/path/")

使用通配符读取多个文件

文件路径中可以使用通配符来读取多个文件。

val rdd1 = sc.textFile("/path/*.txt")  // 读取路径所有txt文件
val rdd2 = sc.textFile("/path/*/*.txt")  // 读取多个目录下的文件

从HDFS读取文件

从HDFS中读取文件和读取本地文件一样,只是要在URI中表明是HDFS。上面的所有读取方式也都适用于HDFS。

val rdd = sc.textFile("hdfs://master:9000/examples/data.txt")

完整代码

package com.sparkexamples.spark.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object ReadTextFileExample {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[1]")
      .appName("SparkExample")
      .getOrCreate()

    val sc = spark.sparkContext

    // 读取单个文件
    val rdd1:RDD[String] = sc.textFile("/path/test01.txt")
    rdd1.foreach(println)

    // 读取多个文件
    val rdd2:RDD[String] = sc.textFile("/path/test01.txt,/path/test02.txt")
    rdd2.foreach(println)

    // 读取路径下所有文件
    val rdd3:RDD[String] = sc.textFile("/path/resources/")
    rdd3.foreach(println)

    // 通配符示例-读取路径所有txt文件
    val rdd4:RDD[String] = sc.textFile("/path/*.txt")
    rdd4.foreach(println)

    // 通配符示例-读取多个目录下的文件
    val rdd5:RDD[String] = sc.textFile("/path/*/*.txt")
    rdd5.foreach(println)

    // 读取HDFS文件
    val rdd6:RDD[String] = sc.textFile("hdfs://master:9000/examples/data.txt")
    rdd6.collect.foreach(println)
  }
}

在线查看

Spark RDD 转换算子(Transformations)

我们将RDD数据的转换操作称为Transformations ,RDD的所有转换操作都不会被计算,它仅记录作用于RDD上的操作,只有在遇到动作算子(Action)时才会进行计算。

下面是一些常见的转换算子:

转换

说明

map(func)

返回一个新的RDD,该RDD由每一个元素由 func 函数转换后组成。

filter(func)

返回一个新的RDD,该RDD由经过 func 函数计算后返回值为true的元素组成。

flatMap(func)

类似于 map,但是这里的 func 函数是将每个元素拆分成为0或多个元素输出,它应该返回一个 Seq,而不是一个元素。

mapPartitions(func)

类似于 map,但它是在每个分区上运行 func 函数,因此当在类型为 T 的RDD上运行时,func 函数类型必须是 Iterator[T] => Iterator[U]

mapPartitionsWithIndex(func)

类似于 mapPartitions,但它的 func 函数多一个当前分片索引号的参数,同 mapPartitionsfunc 函数类型也必须是 Iterator[T] => Iterator[U]

sample(withReplacement, fraction, seed)

数据抽样算子。
withReplacement:放回抽样或不放回抽样,True 表示放回抽样。
fraction:抽样比例,0-1之间的 double 类型参数,eg:0.3表示抽样30%。
seed:设置后会根据这个 seed 随机抽取。

union(otherRDD)

将两个RDD中的元素进行合并取并集,类型要求一致,返回一个新的RDD。

intersection(otherRDD)

将两个RDD中的元素进行合并取交集,类型要求一样,返回一个新的RDD。

distinct([numPartitions])

将当前RDD进行去重后,返回一个新的RDD。

groupByKey([numPartitions])

作用于Key-Value形式的RDD,将相同Key的值进行聚集,返回一个 (K, Iterable[V]) 类型的RDD。

reduceByKey(func, [numPartitions])

作用于Key-Value形式的RDD,将相同Key的值使用 func 进行reduce聚合,返回新的RDD。

sortByKey([ascending], [numPartitions])

作用于Key-Value形式的RDD,根据Key进行排序。

join(otherRDD, [numPartitions])

作用于Key-Value形式的RDD,将相同Key的数据join在一起。

cogroup(otherRDD, [numPartitions])

作用于Key-Value形式的RDD,将相同key的数据分别聚合成一个集合。

cartesian(otherRDD)

返回两个RDD的笛卡尔积RDD。

pipe(command, [envVars])

执行一个外部脚本,返回输出的RDD。

coalesce(numPartitions)

缩减分区数,用于大数据集过滤后提高小数据集的执行效率。

repartition(numPartitions)

根据传入的分区数重新分区。

repartitionAndSortWithinPartitions(partitioner)

重新分区+排序,这比先分区再排序效率高。

示例

map

下面是一个简单的 map 示例:

val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))
println(rdd.map(x => 10 * x).collect.mkString(","))

上面代码将输出:

10,20,30,40,50,60,70,80,90,100

filter

比如分别过滤出1到10中的奇数和偶数,代码如下:

val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))
println(rdd.filter(x => x % 2 == 0).collect.mkString(","))
println(rdd.filter(x => x % 2 != 0).collect.mkString(","))

上面代码将输出:

2,4,6,8,10
1,3,5,7,9

flatMap

flatMap 的功能是将RDD中的每个元素进行拆分。 比如,假设我们读取了文本中的每一行,现在将其按空格拆分成单个词,代码如下:

val fileRDD = sc.parallelize(List("this is the fist line", "this is the second line"))
println(fileRDD.flatMap(x => x.split(" ")).collect.toSeq)

上面代码将输出:

WrappedArray(this, is, the, fist, line, this, is, the, second, line)

mapPartitions

相对于 mapfunc 逐条应用于每条数据,mapPartitions 是在每个分区上一次性将所有数据应于函数 func 。 这样某些情况下具有更高的效率,示例:

val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))
println(rdd.mapPartitions(_.map(x => 10 * x)).collect.toSeq)

上面代码将输出:

WrappedArray(10, 20, 30, 40, 50, 60, 70, 80, 90, 100)

Note

虽然 mapPartitions 的效率优于 map,但是在内存有限的情况下可能会内存溢出。

mapPartitionsWithIndex

类似 mapPartitions, 区别是 mapPartitionsWithIndexfunc 函数多接受一个回调参数:当前分片的索引号。代码示例:

val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10), numSlices = 4)
println(rdd.mapPartitionsWithIndex((index,items)=>(items.map(x=>(index,x)))).collect().mkString(" - "))

上面代码将输出:

(0,1) - (0,2) - (1,3) - (1,4) - (1,5) - (2,6) - (2,7) - (3,8) - (3,9) - (3,10)

sample

根据传入按比例进行有放回或者不放回的抽样。代码示例:

val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))
println(rdd.sample(withReplacement = false, 0.3).collect.mkString(" "))

上面代码将输出:

1 6 10

union

将两个RDD中的元素进行合并取并集,类型要求一致,返回一个新的RDD。代码示例:

val rdd1 = sc.parallelize(1 to 5)
val rdd2= sc.parallelize(6 to 10)
println(rdd1.union(rdd2).collect.mkString(" "))

上面代码将输出:

1 2 3 4 5 6 7 8 9 10

intersection

将两个RDD中的元素进行合并取交集,类型要求一样,返回一个新的RDD。代码示例:

val rdd1 = sc.parallelize(1 to 5)
val rdd2= sc.parallelize(3 to 8)
println(rdd1.intersection(rdd2).collect.mkString(" "))

上面代码将输出:

4 3 5

distinct

对RDD里的元素进行去重操作。代码示例:

val rdd = sc.parallelize(Seq(1,1,2,2,3,4))
println(rdd.distinct.collect.mkString(" "))

上面代码将输出:

4 1 3 2

groupByKey

groupByKey 用于将 RDD[K,V] 中相同 K 的值合并到一个集合 Iterable[V] 中。代码示例:

val rdd = sc.parallelize(Seq(("a", 1), ("b", 2), ("b", 3), ("c", 4), ("c", 5), ("d", 6)))
rdd.groupByKey.collect.foreach(println)

上面代码将输出:

(d,CompactBuffer(6))
(a,CompactBuffer(1))
(b,CompactBuffer(2, 3))
(c,CompactBuffer(4, 5))

reduceByKey

reduceByKeyRDD[K,V] 上调用,返回一个 RDD[K,V] ,使用指定的reduce函数,将相同key的值聚合到一起。代码示例:

val rdd = sc.parallelize(Seq(("a", 1), ("b", 2), ("b", 3), ("c", 4), ("c", 5), ("d", 6)))
rdd.reduceByKey(_+_).collect.foreach(println)

上面代码将输出:

(d,6)
(a,1)
(b,5)
(c,9)

sortByKey

sortByKeyRDD[K,V] 上调用,返回一个 RDD[K,V] ,根据key进行排序,默认为 ascending: Boolean = true (”升序“)。代码示例:

val rdd = sc.parallelize(Seq(("a", 1), ("c", 2), ("b", 3), ("f", 4), ("e", 5), ("d", 6)))
rdd.sortByKey().collect.foreach(println)

上面代码将输出:

(a,1)
(b,3)
(c,2)
(d,6)
(e,5)
(f,4)

join

joinfullOuterJoinleftOuterJoinrightOuterJoin 都是针对 RDD[K,V]K 值相同的连接操作, 分别对应内连接、全连接、左连接、右连接。代码示例:

val rdd1 = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3), ("d", 4)))
val rdd2 = sc.parallelize(Seq(("b", 5), ("c", 6), ("d", 7), ("e", 8)))
rdd1.join(rdd2).collect.foreach(println)

上面代码将输出:

(d,(4,7))
(b,(2,5))
(c,(3,6))

cogroup

代码示例:

val rdd1 = sc.parallelize(1 to 3)
val rdd2 = sc.parallelize(4 to 6)
rdd1.cartesian(rdd2).collect.foreach(println)

上面代码将输出:

(1,4)
(1,5)
(1,6)
(2,4)
(2,5)
(2,6)
(3,4)
(3,5)
(3,6)

coalesce

coalesce 用来缩减分区,第二个可选参数 shuffle 是减少分区的过程中是否shuffle; true 为是, false 为否。默认为 false 。 代码示例:

val rdd1 = sc.parallelize(1 to 10, 4)
println("rdd1 NumPartitions: " + rdd1.getNumPartitions)
val rdd2 = rdd1.coalesce(2)
println("rdd2 NumPartitions: " + rdd2.getNumPartitions)

上面代码将输出:

rdd1 NumPartitions: 4
rdd2 NumPartitions: 2

repartition

代码示例:

val rdd1 = sc.parallelize(1 to 10, 2)
println("rdd1 NumPartitions: " + rdd1.getNumPartitions)
val rdd2 = rdd1.repartition(4)
println("rdd2 NumPartitions: " + rdd2.getNumPartitions)

上面代码将输出:

rdd12 NumPartitions: 2
rdd13 NumPartitions: 4

完整代码

package com.sparkexamples.spark.rdd

import org.apache.spark.sql.SparkSession

object RDDTransformationExamples {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[1]")
      .appName("SparkExample")
      .getOrCreate()

    val sc = spark.sparkContext

    val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))

    println("**** map example: ****")
    println(rdd1.map(x => 10 * x).collect.mkString(","))

    println("**** filter example: ****")
    println(rdd1.filter(x => x % 2 == 0).collect.mkString(","))
    println(rdd1.filter(x => x % 2 != 0).collect.mkString(","))

    println("**** flatMap example: ****")
    val fileRDD = sc.parallelize(List("this is the fist line", "this is the second line"))
    println(fileRDD.flatMap(x => x.split(" ")).collect.toSeq)

    println("**** mapPartitions example: ****")
    println(rdd1.mapPartitions(_.map(x => 10 * x)).collect.toSeq)

    println("**** mapPartitionsWithIndex example: ****")
    val rdd2 = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10), numSlices = 4)
    println(rdd2.mapPartitionsWithIndex((index,items)=>(items.map(x=>(index,x)))).collect.mkString(" - "))

    println("**** sample example: ****")
    println(rdd1.sample(withReplacement = false, 0.3).collect.mkString(" "))

    println("**** union example: ****")
    println(rdd1.union(rdd2).collect.mkString(" "))

    println("**** intersection example: ****")
    println(rdd1.intersection(rdd2).collect.mkString(" "))

    println("**** distinct example: ****")
    val rdd3 = sc.parallelize(Seq(1,1,2,2,3,4))
    println(rdd3.distinct.collect.mkString(" "))

    val rdd4 = sc.parallelize(Seq(("a", 1), ("b", 2), ("b", 3), ("c", 4), ("c", 5), ("d", 6)))
    println("**** groupByKey example: ****")
    rdd4.groupByKey.collect.foreach(println)

    println("**** reduceByKey example: ****")
    rdd4.reduceByKey(_+_).collect.foreach(println)

    val rdd5 = sc.parallelize(Seq(("a", 1), ("c", 2), ("b", 3), ("f", 4), ("e", 5), ("d", 6)))
    println("**** sortByKey example: ****")
    rdd5.sortByKey().collect.foreach(println)

    val rdd6 = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3), ("d", 4)))
    val rdd7 = sc.parallelize(Seq(("b", 5), ("c", 6), ("d", 7), ("e", 8)))
    println("**** join example: ****")
    rdd6.join(rdd7).collect.foreach(println)

    println("**** cogroup example: ****")
    rdd6.cogroup(rdd7).collect.foreach(println)

    val rdd8 = sc.parallelize(1 to 3)
    val rdd9 = sc.parallelize(4 to 6)
    println("**** cartesian example: ****")
    rdd8.cartesian(rdd9).collect.foreach(println)

    println("**** coalesce example: ****")
    val rdd10 = sc.parallelize(1 to 10, 4)
    println("rdd10 NumPartitions: " + rdd10.getNumPartitions)
    val rdd11 = rdd10.coalesce(2)
    println("rdd11 NumPartitions: " + rdd11.getNumPartitions)

    println("**** repartition example: ****")
    val rdd12 = sc.parallelize(1 to 10, 2)
    println("rdd12 NumPartitions: " + rdd12.getNumPartitions)
    val rdd13 = rdd12.repartition(4)
    println("rdd13 NumPartitions: " + rdd13.getNumPartitions)

  }

}

在线查看

Spark RDD 行动算子(Actions)

正如转换算子中提到的,所有的转换算子都是惰性的,这意味着它们不会立即执行,只有在遇到行动算子时才会触发执行。

Spark中的行动算子分为两类:一类算子执行转换算子生成Scala集合或变量;另一类是将RDD保存到外部文件系统或者数据库中。

下面是一些常见的行动算子:

Action

说明

reduce(func)

使用函数 func 聚合RDD中的所有元素。该函数应该是可交换和关联的。

collect()

以数组形式返回所有的数据。

count()

返回 RDD 中元素个数。

first()

返回 RDD 中第一个元素。

take(n)

返回 RDD 中前 n 个元素组成的数据。

takeSample(withReplacement, num, [seed])

返回一个数组,该数组由从 RDD 中随机采样的 num 个元素组成返回。

takeOrdered(n, [ordering])

RDD 的自然顺序或自定义比较器返回 RDD 的前 n 个元素。

saveAsTextFile(path)

RDD 中的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用 toString 方法,将它装换为文件中的文本。

saveAsSequenceFile(path)

RDD 中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以是HDFS或者其他Hadoop支持的文件系统。

countByKey()

应用于 (K,V) 类型的 RDD ,返回一个 (K,Int) 的map,表示每一个key对应的元素个数。

foreach(func)

RDD 中的每一个元素上运行函数 func

示例

reduce

reduce 常用来整合RDD中所有数据,例如求和操作。 示例:

val rdd = sc.parallelize(1 to 10)
println(rdd.reduce((x, y) => x + y))
// 输出: 55

上面的函数部分也可简写为:

rdd.reduce(_+_)

collect

示例:

val rdd = sc.parallelize(1 to 10)
println(rdd.collect().mkString(" "))
// 输出: 1 2 3 4 5 6 7 8 9 10

Note

collect 应该在 filter 或其他操作后返回一个足够小的数据集时使用。 如果直接将整个数据集 collect 返回,这可能会使driver程序OOM。

count

示例:

val rdd = sc.parallelize(1 to 10)
println("elements number:" + rdd.count())
// 输出: elements number:10

first

示例:

val rdd = sc.parallelize(1 to 10)
println("the first element:" + rdd.first())
// 输出: the first element:1

take

示例:

val rdd = sc.parallelize(1 to 10)
println("the first two element:" + rdd.take(2).mkString(" "))
// 输出: the first two element:1 2

Note

仅当预期生成的数组很小时才应使用此方法,因为所有数据都将加载到driver内存中。

takeSample

take 方法,该方法仅在预期结果数组很小的情况下使用,因为所有数据都被加载到driver内存中。

语法:

takeSample(withReplacement: Boolean, num: Int, seed: Long)
  • withReplacement:元素是否可以多次抽样;

  • num:返回样本大小;

  • seed:随机数生成器的种子;

示例:

val rdd = sc.parallelize(1 to 10)
println("take two sample element:" + rdd1.takeSample(withReplacement = true,2).mkString(" "))
// 输出: the first two element:3 10

takeOrdered

take 方法,该方法仅在预期结果数组很小的情况下使用,因为所有数据都被加载到driver内存中。

示例:

val rdd = sc.parallelize(1 to 10)
println("take two element with order:" + rdd.takeOrdered(2).mkString(" "))
// 输出: take two element with order:1 2

saveAsTextFile

示例:

val rdd = sc.parallelize(1 to 10)
rdd1.saveAsTextFile("path/data")

countByKey

示例:

val rdd = sc.parallelize(Seq("a", "b", "b", "c", "c", "c", "d"))
println(rdd.map(x => (x, 1)).countByKey())
// 输出: Map(d -> 1, a -> 1, b -> 2, c -> 3)

完整代码

package com.sparkexamples.spark.rdd

import org.apache.spark.sql.SparkSession

object RDDActionExamples {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[1]")
      .appName("SparkExample")
      .getOrCreate()

    val sc = spark.sparkContext

    val rdd1 = sc.parallelize(1 to 10)

    println("**** reduce example: ****")
    println(rdd1.reduce((x, y) => x + y))

    println("**** collect example: ****")
    println(rdd1.collect().mkString(" "))

    println("**** count example: ****")
    println("elements number:" + rdd1.count())

    println("**** first example: ****")
    println("the first element:" + rdd1.first())

    println("**** take example: ****")
    println("the first two element:" + rdd1.take(2).mkString(" "))

    println("**** takeSample example: ****")
    println("take two sample element:" + rdd1.takeSample(withReplacement = true,2).mkString(" "))

    println("**** takeOrdered example: ****")
    println("take two element with order:" + rdd1.takeOrdered(2).mkString(" "))

    println("**** saveAsTextFile example: ****")
    rdd1.saveAsTextFile("path/data")

    println("**** saveAsTextFile example: ****")
    val rdd2 = sc.parallelize(Seq("a", "b", "b", "c", "c", "c", "d"))
    println(rdd2.map(x => (x, 1)).countByKey())

  }

}

在线查看

Spark RDD — Stage

Spark Stage是计算多个任务的物理执行单元。 Spark处理数据时会根据RDD的依赖关系构建一个有向无环图(DAG),然后根据DAG划分Stage, 在运行时按照步骤执行DAG中的Stage。

DAG

有向无环图:DAG(Directed Acyclic Graph)是什么?:

  • Directed:节点间由边连接在一起,连接顺序由RDD上的调用顺序决定。

  • Acyclic:节点不能连接成环;即如果操作或转换一旦完成,则无法还原回其原始值。

  • Graph:Graph指由边和点排列形成的特定图形。点为RDD,边为RDD上调用的算子。

DAGScheduler将Stage划分为多个任务,然后将Stage信息传递给集群管理器,集群管理器触发任务调度器运行任务。 Spark驱动程序将逻辑执行计划转换为物理执行计划。Spark作业以流水线方法执行。

WordCount例子

下面是一个Spark入门级的程序WordCount:

val sc = spark.sparkContext
sc.textFile("src/main/resources/test.txt")
  .flatMap(line => line.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_+_)
  .saveAsTextFile("src/main/resources/result")

程序解释:

  • textFile(...): 读取本地文本文件 test.txt

  • flatMap(...): 将文件中的内容拆分成每个词 ;

  • map(...): 将每个词处理成 (word, 1) ,方便后续计数统计 ;

  • reduceByKey(...): 按 keyvalue 累加得到每个 key 的数量 ;

  • saveAsTextFile(...): 将计算结果写入到本地目的 result

Spark应用程序其实就是基于RDD的一系列计算操作,可以在SparkUI上看到如下DAG图:

Word Count Stage

WordCount DAG

从图中可以出,WordCount程序中的5个操作算子被分成了两个Stage。

RDD依赖关系

Spark中每个转换算子都会生成一个新的RDD,这样在一系列RDD中就形成了一种前后依赖关系。 RDD中的依赖关系分为窄依赖(Narrow Dependency)与宽依赖(Wide Dependency),下图展示了两种依赖之间的区别:

Narrow And Wide Dependency

宽依赖和窄依赖

  • 窄依赖:父RDD的分区最多被子RDD的一个分区使用。从子RDD角度看,它精确知道依赖的上级RDD;

  • 宽依赖:父RDD的一个分区会被子RDD的多个分区使用。从子RDD角度看,每个分区都要依赖于父RDD的所有分区;

总体而言,如果父RDD的一个分区只被一个子RDD的一个分区所使用就是窄依赖, 否则就是宽依赖。 窄依赖典型的算子包括 mapfilterunion 等,宽依赖典型的算子包括 groupByKeysortByKey 等。对于连接( join )操作,分为两种情况:

  • 对输入进行协同划分(co-partitioned),属于窄依赖(如上图左所示侧)。所谓协同划分是指多个父RDD的每个分区的所有”键(key)“,落在子RDD的同一个分区上,不会产生同一个父RDD的某一分区,落在子RDD的两个分区上的情况。

  • 对输入非协同划分(not co-partitioned),属于宽依赖,如上图右侧所示。

划为宽依赖和窄依赖有以下两个好处:

  • 第一,窄依赖可以在单个集群节点中流水线式执行。比如,我们可以将元素应用了 map 操作后紧接着应用 filter 操作。与此相反,宽依赖需要父亲RDD的所有分区数据都准备好,并且通过类似于MapReduce的操作将数据在不同的节点之间进行重新洗牌(Shuffle)和网络传输。

  • 第二,窄依赖从失败节点中恢复是非常高效的,因为只需要重新计算相对应的父分区数据就可以,而且这个重新计算是在不同的节点并行重计算的,与此相反,在一个含有宽依赖的DAG中,某个节点失败导致一些分区数据的丢失,但是我们需要重新计算父RDD的所有分区的数据。

Stage划分

Spark通过分析各个RDD的依赖关系生成了DAG,再通过分析各个RDD的依赖关系来决定如何划分Stage, 具体划分方法是:从DAG图末端出发,逆向遍历整个依赖链,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到当前的Stage中,Stage中task数目由Stage末端的RDD分区个数来决定;

Stage Example

Stage划分

将窄依赖尽量划分在同一个Stage中,以实现流水线式计算。例如在上面的WordCount例子中,textFileflatMapmap 被划分为Stage0,由于mapreduceByKey为宽依赖,所以reduceByKeysaveAsTextFile被划分为Stage1。

Spark RDD 持久化

Spark可以将RDD持久化到内存或磁盘文件系统中,把RDD持久化到内存中可以极大地提高迭代计算以及各计算模型之间的数据共享。

Spark的开发调优有一个原则,即对多次使用的RDD进行持久化(或称缓存)。如果要对一个RDD进行持久化,只要对这个RDD调用 cachepersist 即可。

cache方法

语法:

cache(): this.type = persist()

cache() 方法使用非序列化的方式直接将RDD的数据全部尝试持久化到内存中, cachepersist 的一个特例。

persist方法

语法:

persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
persist(newLevel: StorageLevel): this.type

persist() 方法可以手动传入一个持久化级别进行持久化,无参调用时等同于 cache()

持久化级别

persist() 方法可以传入一个 StorageLevel(持久化级别),当 StorageLevelMEMORY_ONLY 时就是 cache()

所有支持的 StorageLevel 列表定义在 StorageLevel 的伴生对象中,如下:

/**
 * Various StorageLevel defined and utility functions for creating new storage levels.
 */
object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
}

StorageLevel 类的五个初始化参数为:

class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)

因此可以将上面的持久化级别整理为:

StorageLevel

磁盘

内存

OffHeap

反序列化

副本

NONE

1

DISK_ONLY

2

DISK_ONLY_2

1

MEMORY_ONLY

1

MEMORY_ONLY_2

2

MEMORY_ONLY_SER

1

MEMORY_ONLY_SER_2

2

MEMORY_AND_DISK

1

MEMORY_AND_DISK_2

2

MEMORY_AND_DISK_SER

1

MEMORY_AND_DISK_SER_2

1

OFF_HEAP

1

例如:

  • MEMORY_ONLY:将RDD以未序列化的Java对象格式储存在内存中。如果内存不够存放所有的数据,则某些分区将不会被缓存。

  • MEMORY_AND_DISK:将RDD以未序列化的Java对象格式存储在内存中。如果内存不够存放所有的数据,则将剩余存储在磁盘中。

  • MEMORY_ONLY_SER / MEMORY_AND_DISK_SER:基本含义同 MEMORY_ONLY / MEMORY_AND_DISK 。唯一的区别是,它会将RDD中的数据进行序列化,这样更加节省内存,从而可以避免持久化的数据占用过多内存导致GC。

  • DISK_ONLY:将RDD以未序列化的Java对象格式写入磁盘文件中。

例子

这里以搜狗日志数据为例,计算用户一天的搜索条数,数据从搜狗实验室(http://www.sogou.com/labs/) 下载

将下载后的数据包解压上传至HDFS:

hdfs dfs -put SogouQ.reduced /data 

然后在spark-shell中执行:

scala> val sc = spark.sparkContext
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@70cc0601

scala> val fileRdd = sc.textFile("hdfs://127.0.0.1:9000/data/SogouQ.reduced", minPartitions = 1)
fileRdd: org.apache.spark.rdd.RDD[String] = hdfs://127.0.0.1:9000/data/SogouQ.reduced MapPartitionsRDD[1] at textFile at <console>:25

scala> fileRdd.cache()
res0: fileRdd.type = hdfs://127.0.0.1:9000/data/SogouQ.reduced MapPartitionsRDD[1] at textFile at <console>:25

scala> fileRdd.count()
res1: Long = 1724264

scala> fileRdd.count()
res2: Long = 1724264

在代码中,第一次 count 时会从HDFS中读取数据,然后将这些数据缓存在内存中;第二次 count 时就直接从内存中读取数据。

可以在UI界面看到,第一次 count 耗时2s,第二次 count 只耗时62ms。

RDD Cache Example

Spark — 共享变量

概述

在默认情况下,Spark在集群中多个节点运行一个函数时,它会把函数中涉及到的每个变量先创建一份副本,在副本上进行运算。但是有时候需要在多个任务之间共享变量, 或者在任务(Task)和Driver程序之间共享变量。为了满足这种需求,Spark提供了两种类型的变量:广播变量(broadcast variables)和累加器(accumulators)。

广播变量用来把变量在所有节点的内存之间进行共享。累加器则支持在不同节点之间进行累加计算(比如计数或者求和)。

累加器

累加器是仅仅被相关操作累加的变量,通常用来实现计数器(counter)和求和(sum)。如前面提到在各任务中同一函数中的变量是不共享的,在不使用累加器的情况下, 程序可能不会返回我们期望的值,例如:

var counter = 0
val rdd = sc.parallelize(Seq(1,2,3,4,5))
rdd.foreach(x => counter += x)
println(counter)
// 输出 0

上面代码并没有输出我们期望的 15,变量 counter 定义在Driver程序中,在执行时被累加的其实 counter 的副本,最后累加结果也不会更新到Driver程序的 counter 上,所以最后输出的值仍还是 0

下面我们使用累加器来稍微修改上面的程序:

val counter = sc.longAccumulator("counter")
val rdd = sc.parallelize(Seq(1,2,3,4,5))
rdd.foreach(x => counter.add(x))
println(counter.value)
// 输出 15

这样使用累加器 longAccumulator 程序便返回了我们期望的值。

SparkContext中提供了三种类型的累加器: longAccumulatordoubleAccumulatorcollectionAccumulator,在创建时可以传入一个 name 参数,这样方便在Spark UI界面中查看。

累加器也是惰性计算的,只有在RDD进行Action操作时才会被更新,在遇到如Map等惰性操作时不会被立即计算,例如:

val counter = sc.longAccumulator("counter")
val rdd = sc.parallelize(Seq(1,2,3,4,5))
rdd.map{x => counter.add(x); x}
println(counter.value)
// 输出 0

广播变量

What is Apache Spark?

Apache Spark is an Open source analytical processing engine for large scale powerful distributed data processing and machine learning applications. Spark is Originally developed at the University of California, Berkeley’s, and later donated to Apache Software Foundation. In February 2014, Spark became a Top-Level Apache Project and has been contributed by thousands of engineers and made Spark as one of the most active open-source projects in Apache.

Apache Spark Features

  • In-memory computation

  • Distributed processing using parallelize

  • Can be used with many cluster managers (Spark, Yarn, Mesos e.t.c)

  • Fault-tolerant

  • Immutable

  • Lazy evaluation

  • Cache & persistence

  • Inbuild-optimization when using DataFrames

  • Supports ANSI SQL