如果您覺得“大數據開發運維架構”對你有幫助,歡迎轉發朋友圈
概述:
上篇文章我們講了“Spark2.x進階:深入理解Spark中的閉包”,我建議你看本篇文章之前先看上一篇關於閉包的講解,上篇文章中sum求和後是0,而不是期望的15。如何解決閉包問題呢?如果RDD想操作一個作用域外的變量,一般都是用共享變量來實現,之前也寫廣播變量的用法,這裡講一下累加器的用法。
為什麼要定義累加器?
在 Spark 應用程序中,我們經常會有這樣的需求,如要需要統計符合某種特性數據的總數,這種需求都需要用到計數器。如果一個變量不被聲明為一個累加器,那麼它將在被改變時不會在 driver 端進行全局彙總,即在分佈式運行時每個 task 運行的只是原始變量的 一個副本,並不能改變原始變量的值,但是當這個變量被聲明為累加器後,該變量就會有分佈式計數的功能。
這裡還是跟上篇文章類似的代碼,只不過定義了一個累加器sum,而不是普通變量,實例實例代碼如下:
<code>package com.hadoop.ljs.spark220.studyimport org.apache.spark.{SparkConf, SparkContext}/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-02-20 19:36 * @version: v1.0 * @description: com.hadoop.ljs.spark220.study */object AccumlatorTest { def main(args: Array[String]): Unit = { val sparkConf=new SparkConf().setMaster("local[*]").setAppName("AccumlatorTest") val sc=new SparkContext(sparkConf) /*定義一個共享變量:累加器*/ val sum=sc.accumulator(0) /*輸入數據*/ val rdd1=sc.parallelize(List(1,2,3,4,5)) /*求和 ,然後各個元素加1*/ val rdd2=rdd1.map(x=>{ sum+=x x }) /*這裡是個action操作 沒有這個操作,程序不會執行*/ rdd2.collect() println("求和:"+sum) sc.stop() }}/<code>
運行結果如下,sum=15,符合我們的期望值:
結合上面的代碼說一下累加器的執行過程:
1).Accumulator需要在Driver進行定義和並初始化,並進行註冊,同時Accumulator首先需要在Driver進行序列化,然後發送到Executor端;另外,Driver接收到Task任務完成的狀態更新後,會去更新Value的值,然後在Action操作執行後就可以獲取到Accumulator的值了。
2).Executor接收到Task之後會進行反序列化操作,反序列化得到RDD和function,同時在反序列化的同時也去反序列化Accumulator,同時也會向TaskContext完成註冊,完成任務計算之後,隨著Task結果一起返回給Driver端進行處理。
這裡有執行過程圖可以參考下:
累加器特性:
1.累加器也是也具有懶加載屬性,只有在action操作執行時,才會強制觸發計算求值;
2.累加器的值只可以在Driver端定義初始化,在Executor端更新,不能在Executor端進行定義初始化,不能在Executor端通過[.value]獲取值,任何工作節點上的Task都不能訪問累加器的值;
3.閉包裡的執行器代碼可以使用累加器的 += 方法(在Java中是 add )增加累加器的值。
特別提醒:
累加器在Driver端定義賦初始值,累加器只能在Driver端讀取最後的值,在Excutor端更新。
閱讀更多 JasonLu1986 的文章