Hadoop幾個常見類的作用

1、自定義bean中的CopmareTo()

public class KeyPair implements WritableComparable {

private int year;

private int hot;

@Override

/**

* 定義CopmareTo()是在溢出和merge時用來來排序的

*/

public int compareTo(KeyPair o) {

int res = Integer.compare(year, o.getYear());

if (res != 0) {

return res;

}

return Integer.compare(hot, o.getHot());

}

@Override

public void write(DataOutput dataOutput) throws IOException {

dataOutput.write(year);

dataOutput.write(hot);

}

@Override

public void readFields(DataInput dataInput) throws IOException {

this.year = dataInput.readInt();

this.hot = dataInput.readInt();

}

@Override

public int hashCode() {

return (year + "/" + hot).hashCode();

}

@Override

public boolean equals(Object obj) {

KeyPair k = (KeyPair) obj;

int res1 = Integer.compare(year, k.getYear());

int res2 = Integer.compare(hot, k.getHot());

return (res1 == 0 && res2 == 0) ? true : false;

}

@Override

public String toString() {

return new ToStringBuilder(this)

.append("year", year)

.append("hot", hot)

.toString();

}

get和set方法...

}

2、job.setGroupingComparatorClass(ItemidGroupingComparator.class)

GroupingComparator是在reduce階段分組來使用的,由於reduce階段,如果key相同的一組,只取第一個key作為key,迭代所有的values。

如果reduce的key是自定義的bean,我們只需要bean裡面的某個屬性相同就認為這樣的key是相同的,並把value放同一個values,進行一次reduce函數的處理,這時我們就需要自定義GroupCoparator來“欺騙”reduce了。

3、job.setPartitionerClass(ItemIdPartitioner.class)

partitioner執行時機:shuffle階段,在mapper執行完成,Reducer還沒有執行的時候,mapper的輸出就是partitioner的輸入 即

目的:shuffle階段,用來決定map輸出時,什麼樣的key輸出到同一個reduce節點

4、job.setSortComparatorClass(SortHot.class)

目的:對進入同一個reduce的鍵或鍵的部分 進行排序

時機:shuffle階段,reduce部分fetch數據之後,合併小文件之後的排序

5、job.setCombinerClass()

時機:map輸出之後,shuffle之前

目的:每一個map可能會產生大量的輸出,combiner的作用就是在map端對輸出先做一次合併,以減少傳輸到reducer的數據量

combiner有類似於本地reducer的功能。如果不使用combiner,那麼所有的結果都是Reduce完成,效率會相對低下。使用combiner,先完成的map會在本地聚合,提升速度。

注意:Combiner的輸出是Reducer的輸入,如果combiner是可插拔的,添加combiner絕對不能改變最終計算結果。所以在可插拔的情況下Combiner只應該用於那種和Reduce的輸入key/value與輸出key/value類型完全一致,且不影響最終結果的場景。比如累加,最大值等。

Hadoop幾個常見類的作用

FileInputFormat.addInputPath(job, new Path("/usr/input/hot"))和FileOutputFormat.setOutputPath(job, new Path("/usr/output/hot"))

設置源數據的輸入目錄,和計算結果的輸出目錄

6、Shuffle原理!

Hadoop幾個常見類的作用

一個切片對應一個map ,每一個map對應一個在內存中的環形緩衝區,用來存儲map的輸出,緩衝區的大小默認是100M

當map向環形緩衝區寫入數據達到一定閥值,就是超出一定範圍(80%) 就會啟動一個後臺線程將緩衝區數據溢寫到磁盤

注意:不僅僅是將數據存入到磁盤,而是經過很複雜的過程,寫入磁盤之前首先將數據按照分區規則進行分區,如果沒有指定分區規則,就會按照Hadoop默認的分區規則進行分區,然後按照排序規則對分區內的數據按照k2(map輸出的key即reducer輸入的key)進行排序(每個分區內調用job.setSortComparatorClass()設置的key比較函數類排序(如果沒有通過job.setSortComparatorClass()設置key比較函數類,則使用key的實現的compareTo方法),如果k2是Text類型 則按照k2的字典順序的排序規則,如果是Bean按照自定義的排序規則排序,然後得到多個分區且排序的小文件。分區是按照分區號排序,分區內的數據是按照k2的排序規則進行排序,小文件內可以有多個分區。

由於map向緩存區中存數據速度遠遠比緩衝區向磁盤寫數據的速度,所以當緩衝區中的數據達到80%,map就會阻塞,停止向緩衝區存入數據,直到緩衝區中的數據寫入到多個小文件並清空緩衝區,才讓map繼續向緩衝區存入數據,之所以寫入多個小文件,是因為文件越小排序的速度越快。

注意:一個分區如果過大可能存在於多個小文件之中。在得到多個分區且排序的小文件後要進行合併,合併規則是按照分區號將多個小文件中的部分分區數據合併成對應分區號的完整分區數據裡面,在合併的同時再對分區內的多個部分數據按照k2排序規則進行一次排序。


分享到:


相關文章: