hdfs寫入流程解析

流程

hdfs寫入流程解析

寫入本地file文件,假設文件200M,則共有2個塊,block1為128M(hdfs默認塊大小為128M),block2為72M。默認三個副本。

1、ClientNode向HDFS寫入數據,先調用DistributedFileSystem的 create 方法獲取FSDataOutputStream。

2、DistributedFileSystem調用NameNode的 create 方法,發出文件創建請求。NameNode對待上傳文件名稱和路徑做檢驗,如上傳文件是否已存在同名目錄,文件是否已經存在,遞歸創建文件的父目錄(如不存在)等。並將操作記錄在edits文件中。

3、ClientNode調用FSDataOutputStream向輸出流輸出數據(假設先寫block1)。

4、FSDataOutputStream調用NameNode的 addBlock 方法申請block1的blockId和block要存儲在哪幾個DataNode(假設DataNode1,DataNode2和DataNode3)。若pipeline還沒有建立,則根據位置信息建立pipeline。

5、同返回的第一個DataNode節點DataNode1建立socket連接,向其發送package。同時,此package會保存一份到ackqueue確認隊列中。寫數據時先將數據寫到一個校驗塊chunk中,寫滿512字節,對chunk計算校驗和checksum值(4字節)。以帶校驗和的checksum為單位向本地緩存輸出數據(本地緩存佔9個chunk),本地緩存滿了向package輸入數據,一個package佔64kb。當package寫滿後,將package寫入dataqueue數據隊列中。將package從dataqueue數據對列中取出,沿pipeline發送到DataNode1,DataNode1保存,然後將package發送到DataNode2,DataNode2保存,再向DataNode3發送package。DataNode3接收到package,然後保存。

6、package到達DataNode3後做校驗,將校驗結果逆著pipeline回傳給ClientNode。DataNode3將校驗結果傳給DataNode2,DataNode2做校驗後將校驗結果傳給DataNode1,DataNode1做校驗後將校驗結果傳給ClientNode。ClientNode根據校驗結果判斷,如果”成功“,則將ackqueue確認隊列中的package刪除;如果”失敗“,則將ackqueue確認隊列中的package取出,重新放入到dataqueue數據隊列末尾,等待重新沿pipeline發送。

7、當block1的所有package發送完畢。即DataNode1、DataNode2和DataNode3都存在block1的完整副本,則三個DataNode分別調用NameNode的 blockReceivedAndDeleted方法。NameNode會更新內存中DataNode和block的關係。ClientNode關閉同DataNode建立的pipeline。文件仍存在未發送的block2,則繼續執行4。直到文件所有數據傳輸完成。

8、全部數據輸出完成,調用FSDataOutputStream的 close 方法。

9、ClientNode調用NameNode的 complete 方法,通知NameNode全部數據輸出完成。

容錯

假設當前構建的pipeline是DataNode1、DataNode2和DataNode3。當數據傳輸過程中,DataNode2中斷無法響應,則當前pipeline中斷,需要重建。

  1. 先將ackqueue中的所有package取出放回到dataqueue末尾。
  2. ClientNode調用NameNode的 updateBlockForPipeline 方法,為當前block生成新的版本,如ts1(本質是時間戳),然後將故障DataNode2從pipeline中刪除。
  3. FSDataOutputStream調用NameNode的 getAdditionalDataNode 方法,由NameNode分配新的DataNode,假設是DataNode4。
  4. FSDataOutputStream把DataNode1、DataNode3和DataNode4建立新的pipeline,DataNode1和DataNode3上的block版本設置為ts1,通知DataNode1或DataNode3將block拷貝到DataNode4。
  5. 新的pipeline創建好後,FSDataOutputStream調用NameNode的 updataPipeline 方法更新NameNode元數據。之後,按照正常的寫入流程完成數據輸出。
  6. 後續,當DataNode2從故障中恢復。DataNode2向NameNode報送所有block信息,NameNode發現block為舊版本(非ts1),則通過DataNode2的心跳返回通知DataNode2將此舊版本的block刪除。


分享到:


相關文章: