MapReduce 運行全過程解析

MapReduce 的運行過程。

前言

前面我們講了 MapReduce 的編程模型,我們知道他主要分成兩大階段來完成一項任務,一是 map 階段對我們的數據進行分開計算,第二是 reduce 階段,對 map 階段計算產生的結果再進行彙總。

還寫了一個非常經典的,類似於Java 中 HelloWorld 一樣的 WordCount 代碼。今天我們就根據這個代碼來闡述整個 MapReduce 的運行過程。

先苦口婆心的告訴你,這個知識點是非常非常非常之重要,之前面的 5 家公司,有 3 家公司都問了這個過程,另外兩家問了 Yarn 的運行機制,這是後面會去講的內容,你必須得懂大體的流程是怎麼樣子,如果能去研究搞清楚每個細節,那當然最好的。

從數據進如到處理程序到處理完成後輸出到存儲中,整個過程我們大體分為如下 5 個階段:

  • Input Split 或 Read 數據階段: Input Split,是從數據分片出發,把數據輸入到處理程序中。Read 則是從處理程序出發反向來看,把數據從文件中讀取到處理程序中來。這個階段表達的是我們數據從哪裡來。這是整個過程的開始。
  • Map階段: 當數據輸入進來以後,我們進行的是 map 階段的處理。例如對一行的單詞進行分割,然後每個單詞進行計數為 1 進行輸出。
  • Shuffle 階段: Shuffle 階段是整個 MapReduce 的核心,介於 Map 階段跟 Reduce 階段之間。在 Spark 中也有這個概念,可以說你理解了這個概念,到時候再學習其他的大數據計算框架原理的時候,會給你帶來非常大的幫助,因為他們大多理念是相同的,下面會重點講解這個過程。
  • Reduce 階段: 數據經過 Map 階段處理,數據再經過 Shuffle 階段,最後到 Reduce ,相同的 key 值的數據會到同一個 Reduce 任務中進行最後的彙總。
  • Output 階段: 這個階段的事情就是將 Reduce 階段計算好的結果,存儲到某個地方去,這是整個過程的結束。

整個執行流程圖

一圖勝千言:


MapReduce 運行全過程解析


如果看不清晰,我上傳了一份完整的在 gayHub 上面,地址:https://raw.githubusercontent.com/heyxyw/bigdata/master/bigdatastudy/doc/img/mapreduce/mr-Implementation-process.png

當然了,不太瞭解或者剛接觸可能一開始看比較懵逼,我剛開始也是。下面我們就一塊一塊的來拆分講解,最後差不多就明白了。

Input Split 數據階段

Input Split 顧明思議,輸入分片 ,為什麼我們會叫 輸入分片呢?因為數據在進行 Map 計算之前,MapReduce 會根據輸入文件進行切分,因為我們需要分佈式的進行計算嘛,那麼我得計算出來我的數據要切成多少片,然後才好去對每片數據分配任務去處理。

每個輸入分片會對應一個 Map 任務,輸入分片存儲的並非數據本身,而是一個分片長度和一個記錄數據的位置數據,它往往是和 HDFS 的 block(塊) 進行關聯的。

假如我們設定每個 HDFS 的塊大小是 128M,如果我們現在有3個文件,大小分別是 10M,129M,200M,那麼MapReduce 對把 10M 的文件分為一個分片,129M 的數據文件分為2個分片,200M 的文件也是分為兩個分片。那麼此時我們就有 5 個分片,就需要5個 Map 任務去處理,而且數據還是不均勻的。

如果有非常多的小文件,那麼就會產生大量的 Map 任務,處理效率是非常低下的。

這個階段使用的是 InputFormat 組件,它是一個接口 ,默認使用的是 TextInputFormat 去處理,他會調用 readRecord() 去讀取數據。

這也是MapReduce 計算優化的一個非常重要的一個點,面試被考過。如何去優化這個小文件的問題呢?

  • 最好的辦法:在數據處理系統的最前端(預處理、採集),就將小文件先進行合併了,再傳到 HDFS 中去。
  • 補救措施:如果已經存在大量的小文件在HDFS中了,可以使用另一種 InputFormat 組件CombineFileInputFormat 去解決,它的切片方式跟 TextInputFormat 不同,它會將多個小文件從邏輯上規劃到一個切片中,這樣,多個小文件就可以交給一個 Map 任務去處理了。

Map階段

Map 階段就是我們編寫好的 map 函數了,在 WordCount 示例中執行的就是對輸入的每一行數據進行切分,然後把單詞跟計數一起發送出去,類似於

Map 階段一般在存儲數據的節點上運行。為什麼是在數據存儲的節點呢?因為移動數據的代價比較高,移動數據不如移動計算。

Shuffle 階段

將 Map 階段的輸出作為 Reduce 階段的輸入的過程就是 Shuffle 。 這也是整個 MapReduce 中最重要的一個環節。

一般MapReduce 處理的都是海量數據,Map 輸出的數據不可能把所有的數據都放在內存中, 當我們在map 函數中調用 context.write() 方法的時候,就會調用 OutputCollector 組件把數據寫入到處於內存中的一個叫環形緩衝區的東西。

環形緩衝區默認大小是 100M ,但是隻寫80%,同時map還會為輸出操作啟動一個守護線程,當到數據達到80%的時候,守護線程開始清理數據,把數據寫到磁盤上,這個過程叫 spill 。

數據在寫入環形緩衝區的時候,數據會默認根據key 進行排序,每個分區的數據是有順序的,默認是 HashPartitioner。當然了,我們也可以去自定義這個分區器。

每次執行清理都產生一個文件,當 map 執行完成以後,還會有一個合併文件文件的過程,其實他這裡跟 Map 階段的輸入分片(Input split)比較相似,一個 Partitioner 對應一個 Reduce 作業,如果只有一個 reduce 操作,那麼 Partitioner 就只有一個,如果有多個 reduce 操作,那麼 Partitioner 就有多個。Partitioner 的數量是根據 key 的值和 Reduce 的數量來決定的。可以通過 job.setNumReduceTasks() 來設置。

這裡還有一個可選的組件 Combiner ,溢出數據的時候如果調用 Combiner 組件,它的邏輯跟 reduce 一樣,相同的key 先把 value 進行相加,前提是合併並不會改變業務,這樣就不糊一下傳輸很多相同的key 的數據,從而提升效率。

舉個例子,在溢出數據的時候,默認不使用 Combiner,數據是長這樣子:

Reduce 階段

在執行 Reduce 之前,Reduce 任務會去把自己負責分區的數據拉取到本地,還會進行一次歸併排序並進行合併。

Reduce 階段中的 reduce 方法,也是我們自己實現的邏輯,跟Map 階段的 map 方法一樣,只是在執行 reduce 函數的時候,values 為 同一組 key 的value 迭代器。在 wordCount 的例子中,我們迭代這些數據進行疊加。最後調用 context.write 函數,把單詞和總數進行輸出。

Output 階段

在 reduce 函數中調用 context.write 函數時,會調用 OutPutFomart 組件,默認實現是 TextOutPutFormat ,把數據輸出到目標存儲中,一般是 HDFS。

擴展

上面我們只是講解了大體的流程,這裡給大家拋幾個問題?也是面試中經常被問到的

  1. 文件切分是怎麼切的?一個文件到底會切成幾分?算法是怎麼樣的?
  2. Map 任務的個數是怎麼確定的?

上面的問題,給大家貼兩個鏈接:

MapReduce Input Split(輸入分/切片)詳解:https://blog.csdn.net/dr_guo/article/details/51150278

源碼解析 MapReduce作業切片(Split)過程:https://blog.csdn.net/u010010428/article/details/51469994

總結

MapReduce 的執行流程到這裡就大致講解完成了,希望你也能畫出來上面的大圖。能夠理解到大體的流程,並能掌握關鍵的環節 Shuffle 。以後你還會在其他的大數據組件上聽到這個詞。

後面將給大家帶來 Yarn 的大致運行機制,然後再為大家講解 WordCount 運行的整個過程。

敬請期待。


分享到:


相關文章: