Spark2.x入門:共享變量類型:廣播變量和累加器(下)

如果您覺得“大數據開發運維架構”對你有幫助,歡迎轉發朋友圈



概述:

上篇文章我們講了“Spark2.x進階:深入理解Spark中的閉包”,我建議你看本篇文章之前先看上一篇關於閉包的講解,上篇文章中sum求和後是0,而不是期望的15。如何解決閉包問題呢?如果RDD想操作一個作用域外的變量,一般都是用共享變量來實現,之前也寫廣播變量的用法,這裡講一下累加器的用法。


為什麼要定義累加器?

在 Spark 應用程序中,我們經常會有這樣的需求,如要需要統計符合某種特性數據的總數,這種需求都需要用到計數器。如果一個變量不被聲明為一個累加器,那麼它將在被改變時不會在 driver 端進行全局彙總,即在分佈式運行時每個 task 運行的只是原始變量的 一個副本,並不能改變原始變量的值,但是當這個變量被聲明為累加器後,該變量就會有分佈式計數的功能。

這裡還是跟上篇文章類似的代碼,只不過定義了一個累加器sum,而不是普通變量,實例實例代碼如下:

<code>package com.hadoop.ljs.spark220.studyimport org.apache.spark.{SparkConf, SparkContext}/**  * @author: Created By lujisen  * @company ChinaUnicom Software JiNan  * @date: 2020-02-20 19:36  * @version: v1.0  * @description: com.hadoop.ljs.spark220.study  */object AccumlatorTest {  def main(args: Array[String]): Unit = {    val sparkConf=new SparkConf().setMaster("local[*]").setAppName("AccumlatorTest")    val sc=new SparkContext(sparkConf)    /*定義一個共享變量:累加器*/    val sum=sc.accumulator(0)    /*輸入數據*/    val rdd1=sc.parallelize(List(1,2,3,4,5))    /*求和 ,然後各個元素加1*/    val rdd2=rdd1.map(x=>{      sum+=x      x    })    /*這裡是個action操作 沒有這個操作,程序不會執行*/    rdd2.collect()    println("求和:"+sum)    sc.stop()  }}/<code>

運行結果如下,sum=15,符合我們的期望值:

Spark2.x入門:共享變量類型:廣播變量和累加器(下)


結合上面的代碼說一下累加器的執行過程:


1).Accumulator需要在Driver進行定義和並初始化,並進行註冊,同時Accumulator首先需要在Driver進行序列化,然後發送到Executor端;另外,Driver接收到Task任務完成的狀態更新後,會去更新Value的值,然後在Action操作執行後就可以獲取到Accumulator的值了。


2).Executor接收到Task之後會進行反序列化操作,反序列化得到RDD和function,同時在反序列化的同時也去反序列化Accumulator,同時也會向TaskContext完成註冊,完成任務計算之後,隨著Task結果一起返回給Driver端進行處理。


這裡有執行過程圖可以參考下:

Spark2.x入門:共享變量類型:廣播變量和累加器(下)


累加器特性:

1.累加器也是也具有懶加載屬性,只有在action操作執行時,才會強制觸發計算求值;

2.累加器的值只可以在Driver端定義初始化,在Executor端更新,不能在Executor端進行定義初始化,不能在Executor端通過[.value]獲取值,任何工作節點上的Task都不能訪問累加器的值;

3.閉包裡的執行器代碼可以使用累加器的 += 方法(在Java中是 add )增加累加器的值。



特別提醒:

累加器在Driver端定義賦初始值,累加器只能在Driver端讀取最後的值,在Excutor端更新。


分享到:


相關文章: