Spark — 共享变量

概述

在默认情况下,Spark在集群中多个节点运行一个函数时,它会把函数中涉及到的每个变量先创建一份副本,在副本上进行运算。但是有时候需要在多个任务之间共享变量, 或者在任务(Task)和Driver程序之间共享变量。为了满足这种需求,Spark提供了两种类型的变量:广播变量(broadcast variables)和累加器(accumulators)。

广播变量用来把变量在所有节点的内存之间进行共享。累加器则支持在不同节点之间进行累加计算(比如计数或者求和)。

累加器

累加器是仅仅被相关操作累加的变量,通常用来实现计数器(counter)和求和(sum)。如前面提到在各任务中同一函数中的变量是不共享的,在不使用累加器的情况下, 程序可能不会返回我们期望的值,例如:

var counter = 0
val rdd = sc.parallelize(Seq(1,2,3,4,5))
rdd.foreach(x => counter += x)
println(counter)
// 输出 0

上面代码并没有输出我们期望的 15,变量 counter 定义在Driver程序中,在执行时被累加的其实 counter 的副本,最后累加结果也不会更新到Driver程序的 counter 上,所以最后输出的值仍还是 0

下面我们使用累加器来稍微修改上面的程序:

val counter = sc.longAccumulator("counter")
val rdd = sc.parallelize(Seq(1,2,3,4,5))
rdd.foreach(x => counter.add(x))
println(counter.value)
// 输出 15

这样使用累加器 longAccumulator 程序便返回了我们期望的值。

SparkContext中提供了三种类型的累加器: longAccumulatordoubleAccumulatorcollectionAccumulator,在创建时可以传入一个 name 参数,这样方便在Spark UI界面中查看。

累加器也是惰性计算的,只有在RDD进行Action操作时才会被更新,在遇到如Map等惰性操作时不会被立即计算,例如:

val counter = sc.longAccumulator("counter")
val rdd = sc.parallelize(Seq(1,2,3,4,5))
rdd.map{x => counter.add(x); x}
println(counter.value)
// 输出 0

广播变量