Spark — 共享变量¶
概述¶
在默认情况下,Spark在集群中多个节点运行一个函数时,它会把函数中涉及到的每个变量先创建一份副本,在副本上进行运算。但是有时候需要在多个任务之间共享变量, 或者在任务(Task)和Driver程序之间共享变量。为了满足这种需求,Spark提供了两种类型的变量:广播变量(broadcast variables)和累加器(accumulators)。
广播变量用来把变量在所有节点的内存之间进行共享。累加器则支持在不同节点之间进行累加计算(比如计数或者求和)。
累加器¶
累加器是仅仅被相关操作累加的变量,通常用来实现计数器(counter)和求和(sum)。如前面提到在各任务中同一函数中的变量是不共享的,在不使用累加器的情况下, 程序可能不会返回我们期望的值,例如:
var counter = 0
val rdd = sc.parallelize(Seq(1,2,3,4,5))
rdd.foreach(x => counter += x)
println(counter)
// 输出 0
上面代码并没有输出我们期望的 15
,变量 counter
定义在Driver程序中,在执行时被累加的其实 counter
的副本,最后累加结果也不会更新到Driver程序的 counter
上,所以最后输出的值仍还是 0
。
下面我们使用累加器来稍微修改上面的程序:
val counter = sc.longAccumulator("counter")
val rdd = sc.parallelize(Seq(1,2,3,4,5))
rdd.foreach(x => counter.add(x))
println(counter.value)
// 输出 15
这样使用累加器 longAccumulator
程序便返回了我们期望的值。
SparkContext中提供了三种类型的累加器: longAccumulator
、 doubleAccumulator
、 collectionAccumulator
,在创建时可以传入一个 name
参数,这样方便在Spark UI界面中查看。
累加器也是惰性计算的,只有在RDD进行Action操作时才会被更新,在遇到如Map等惰性操作时不会被立即计算,例如:
val counter = sc.longAccumulator("counter")
val rdd = sc.parallelize(Seq(1,2,3,4,5))
rdd.map{x => counter.add(x); x}
println(counter.value)
// 输出 0