使用共享变量-广播变量
默认情况下,当Spark作为不同节点上的一组任务并行运行一个函数时,它会将函数中使用的每个变量的副本发送给每个任务。有时候,一个变量需要在任务之间共享,或者在任务和驱动程序之间共享。
Spark支持两种类型的共享变量:广播变量(broadcast variable)和累加器(accumulator),广播变量可用于在所有节点的内存中缓存一个值,累加器是只“添加”到其中的变量,比如计数器和sum。
广播变量和累加器能够维护一个全局状态,或者在Spark程序中的任务和分区之间共享数据。
广播变量
广播变量允许程序员在每台机器上保持一个缓存的只读变量,而不是将其副本与任务一起发送。例如,可以使用它们以有效的方式为每个节点提供一个大型输入数据集的副本。Spark还尝试使用高效的广播算法来分发广播变量,以降低通信成本。
广播变量可以从整个集群中共享和访问,但它们不能被executors修改。驱动程序创建一个广播变量,executors读取它。
如果有大量的数据,而这些数据是大多数executors所需要的,那么应该使用广播变量。
当不再需要广播变量时,可以销毁它。所有关于它的信息都将被删除(从executors和驱动程序),并且该变量将不可用。如果试图在调用destroy之后访问它,将抛出一个异常。
另一种方法是调用unpersist,它只从executors的缓存中删除变量值。如果尝试在unpersist之后使用它,它将再次被发送到executors。
下面是广播变量的使用示例。
val broads = sc.broadcast(3) //创建广播变量,变量可以是任意类型 val lists = List(1,2,3,4,5) // 创建一个测试的List val listRDD = sc.parallelize(lists) // 构造一个rdd val results = listRDD.map(x => x * broads.value) //map操作数据 println("结果是:") results.collect.foreach(println) // 遍历结果
执行以上代码,输出结果如下:
3 6 9 12 15