MapReduce知識整理

MapReduce概述

MapReduce定義

MapReduce是一個分佈式運算程序的編程框架,是用戶開發“基於Hadoop的數據分析應用”的核心框架。

MapReduce核心功能是將用戶編寫的業務邏輯代碼和自帶默認組件整合成一個完整的分佈式運算程序,並運行在一個Hadoop集群上。

MapReduce優缺點

優點

1.MapReduce易於編程。

它簡單的實現一些接口,就可以完成一個分佈式程序,這個分佈式程序可以分佈到大量廉價的PC機器上運行。也就是說你寫一個分佈式程序,跟寫一個簡單的串行程序是一模一樣的。就是因為這個特點使得MapReduce編程變得非常流行。

2.良好的擴展性

當你的計算資源不能得到滿足的時候,你可以通過簡單的增加機器來擴展它的計算能力。

3.高容錯性

MapReduce設計的初衷就是使程序能夠部署在廉價的PC機器上,這就要求它具有很高的容錯性。比如其中一臺機器掛了,它可以把上面的計算任務轉移到另外的一個節點上運行,不至於這個任務失敗,而且這個過程不需要人工參與,而完全是Hadoop內部完成的。

4.適合PB級以上海量數據的離線處理

可以實現上千臺服務器集群併發工作,提供數據處理能力。

缺點

1.不擅長實時計算

MapReduce無法像MySql一樣,在毫秒或者秒內返回結果。

2.不擅長流式計算

流式計算的輸入數據是動態的,而MapReduce的輸入數據集是靜態的,不能動態變化。這是因為MapReduce自身的設計特點決定了數據源必須是靜態的。

3.不擅長DAG(有向圖)計算

多個應用程序存在依賴關係,後一個應用程序的輸入為前一個輸出。在這種情況下,MapReduce並不能做,而是使用後,每個MapReduce作業的輸出節過都會寫入磁盤,會造成大量的磁盤IO,導致性能非常的低下。

MapReduce核心思想

MapReduce知識整理

1)分佈式的運算程序往往需要分成至少2個階段。

2)第一個階段的MapTask併發實例,完全並行運行,互不相干。

3)第二個階段的ReduceTask併發實例互不相干,但是他們的數據依賴於上一個階段的所有MapTask併發實例的輸出。

4)MapReduce編程模型只能包含一個Map階段和一個Reduce階段,如果用戶的業務邏輯非常複雜,那就只能多個MapReduce程序,串行運行。

總結:分析WordCount數據流走向深入理解MapReduce核心思想。

MapReduce進程

一個完整的MapReduce程序在分佈式運行時有三類實例進程:

1.MrAppMaster:負責整個程序的過程調度及狀態協調。

2.MapTask:負責Map階段的整個數據處理流程。

3.ReduceTask:負責Reduce階段的整個數據處理流程。

官方WordCount源碼

用反編譯工具反編譯源碼,發現WordCount案例有Map類、Reduce類和驅動類。且數據的類型是Hadoop自身封裝的序列化類型。

MapReduce知識整理

MapReduce編程規範

用戶編寫的程序分成三個部分:Mapper、Reducer和Driver。

1.Mapper階段

(1)用戶自定義的Mapper要繼承自己的父類

(2)Mapper的輸入數據是KV對的形式(KV的類型可自定義)

(3)Mapper中的業務邏輯寫在map()方法中

(4) Mapper的輸出數據是KV對的形式(KV的類型可自定義)

(5)map()方法(MapTask進程)對每一個調用一次

2.Reduce階段

(1)用戶自定義的Reducer要繼承自己的分類

(2)Reducer的輸入數據類型對應的Mapper的輸出數據類型,也是KV

(3)Reducer的業務邏輯寫在Reduce()方法中

(4)ReduceTask進程對每一組相同的組調用一次reduce()方法

3.Driver階段

相當於YARN集群的客戶端,用於提交我們整個程序到YARN集群,提交的是封裝了MapReduce程序相關運行參數的job對象。

WordCount案例實操

1.需求

在給定的文本文件中統計輸出每一個單詞出現的總次數

(1)輸入數據,hello.txt

(2)期望輸出數據

<code>atguigu 2
banzhang 1
cls 2
hadoop 1
jiao 1
ss 2
xue 1/<code>

2.需求分析

按照MapReduce編程規範,分別編寫Mapper,Reducer,Driver,

MapReduce知識整理

3.環境準備

(1)創建maven工程

(2)在pom.xml文件中添加如下依賴

<code>
		
			junit
			junit
			RELEASE
		
		
			org.apache.logging.log4j
			log4j-core
			2.8.2
		
		
			org.apache.hadoop
			hadoop-common
			2.7.2
		
		
			org.apache.hadoop
			hadoop-client
			2.7.2
		
		
			org.apache.hadoop
			hadoop-hdfs
			2.7.2
		
/<code>

(3)在項目的src/main/resources目錄下,新建一個文件,命名為“log4j.properties”,在文件中填入。

<code>log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n/<code>

4.編寫程序

(1)編寫Mapper類

<code>import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordcountMapper extends Mapper{
	
	Text k = new Text();
	IntWritable v = new IntWritable(1);
	
	@Override
	protected void map(LongWritable key, Text value, Context context)	throws IOException, InterruptedException {
		
		// 1 獲取一行
		String line = value.toString();
		
		// 2 切割
		String[] words = line.split(" ");
		
		// 3 輸出
		for (String word : words) {
			
			k.set(word);
			context.write(k, v);
		}
	}
}/<code>

(2)編寫Reducer類

<code>import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordcountReducer extends Reducer{

int sum;
IntWritable v = new IntWritable();

	@Override
	protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException {
		
		// 1 累加求和
		sum = 0;
		for (IntWritable count : values) {
			sum += count.get();
		}
		
		// 2 輸出
       v.set(sum);
		context.write(key,v);
	}
}/<code>

(3)編寫Driver驅動類

<code>import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordcountDriver {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

		// 1 獲取配置信息以及封裝任務
		Configuration configuration = new Configuration();
		Job job = Job.getInstance(configuration);

		// 2 設置jar加載路徑
		job.setJarByClass(WordcountDriver.class);

		// 3 設置map和reduce類
		job.setMapperClass(WordcountMapper.class);
		job.setReducerClass(WordcountReducer.class);

		// 4 設置map輸出
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);

		// 5 設置最終輸出kv類型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		// 6 設置輸入和輸出路徑
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		// 7 提交
		boolean result = job.waitForCompletion(true);

		System.exit(result ? 0 : 1);
	}
}/<code>

5.本地測試

(1)如果電腦系統是win7的就將win7的hadoop jar包解壓到非中文路徑,並在Windows環境上配置HADOOP_HOME環境變量。如果是電腦win10操作系統,就解壓win10的hadoop jar包,並配置HADOOP_HOME環境變量。

注意:win8電腦和win10家庭版操作系統可能有問題,需要重新編譯源碼或者更改操作系統。

(2)在Eclipse/Idea上運行程序

6.集群上測試

(1)用maven打jar包,需要添加的打包插件依賴

注意:標記紅顏色的部分需要替換為自己工程主類

<code>
		
			
				maven-compiler-plugin
				2.3.2
				
					1.8
					1.8
				
			
			
				maven-assembly-plugin 
				
					
						jar-with-dependencies
					
					
						
							com.atguigu.mr.WordcountDriver
						
					
				
				
					
						make-assembly
						package
						
							single
						
					
				
			
		
	/<code>

注意:如果工程上顯示紅叉。在項目上右鍵->maven->update project即可。

(1)將程序打成jar包,然後拷貝到Hadoop集群中

步驟詳情:右鍵->Run as->maven install。等待編譯完成就會在項目的target文件夾中生成jar包。如果看不到。在項目上右鍵-》Refresh,即可看到。修改不帶依賴的jar包名稱為wc.jar,並拷貝該jar包到Hadoop集群。

(2)啟動Hadoop集群

(3)執行WordCount程序

<code>hadoop jar  wc.jar
 com.atguigu.wordcount.WordcountDriver /user/atguigu/input /user/atguigu/output/<code>

Hadoop序列化

序列化概述

序列化就是把內存中的對象,轉換成字節序列(或其他數據傳輸協議)以便於存儲到磁盤(持久化)和網絡傳輸。

反序列就是將收到字節序列(或其他數據傳輸協議)或者是磁盤的持久化數據,轉換成內存中的對象。

為什麼要序列化

一般來說,“活的”對象只生存在內存裡,關機斷電就沒有了。而且“活的”對象只能由本地的進程使用,不能被髮送到網絡上的另外一臺計算機。然而序列化可以存儲“活的對象”,可以將“活的”對象發送到遠程計算機。

為什麼不用Java的序列化

Java的序列化是一個重量級序列化框架,一個對象被序列化後,會附帶很多額外的信息(各種校驗信息,Header,繼承體系等),不便於在網絡中高效傳輸。所以,Hadoop自己開發了一套序列化機制(Writable)。

Hadoop序列化特點:

(1)緊湊:高效使用存儲空間

(2)快速:讀寫數據的額外開銷小

(3)可擴展性:隨著通信協議的升級而可升級

(4)支持多語言的交互

自定義bean對象實現序列化接口(Writable)

在企業開發中往往常用的基本序列化類型不能滿足所有需求,比如在Hadoop框架內部傳遞一個bean對象,那麼該對象就需要實現序列化接口。

具體實現bean對象序列化步驟如下7步。

(1)必須實現Writable接口

(2)反序列化時,需要反射調用空參構造函數,所以必須有空參構造

<code>public FlowBean() {
	super();
}/<code>

(3)重寫序列化方法

<code>@Override
public void write(DataOutput out) throws IOException {
	out.writeLong(upFlow);
	out.writeLong(downFlow);
	out.writeLong(sumFlow);
}/<code>

(4)重寫反序列化方法

<code>@Override
public void readFields(DataInput in) throws IOException {
	upFlow = in.readLong();
	downFlow = in.readLong();
	sumFlow = in.readLong();
}/<code>

(5)注意反序列化的順序和序列化的順序完全一致

(6)要想把結果顯示在文件中,需要重寫toString(),可用”\t”分開,方便後續用。

(7)如果需要將自定義的bean放在key中傳輸,則還需要實現Comparable接口,因為MapReduce框中的Shuffle過程要求對key必須能排序。

<code>@Override
public int compareTo(FlowBean o) {
	// 倒序排列,從大到小
	return this.sumFlow > o.getSumFlow() ? -1 : 1;
}/<code>

序列化案例實操

1. 需求

統計每一個手機號耗費的總上行流量、下行流量、總流量

(1)輸入數據

phone_data.txt

(2)輸入數據格式:

<code>7 13560436666 120.196.100.99 1116  954 200
id 手機號碼 網絡ip 上行流量  下行流量     網絡狀態碼/<code>

(3)期望輸出數據格式

<code>13560436666 		1116		      954 			2070
手機號碼		    上行流量        下行流量		總流量/<code>

2.需求分析

MapReduce知識整理

3.編寫MapReduce程序

(1)編寫流量統計的Bean對象

<code>import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

// 1 實現writable接口
public class FlowBean implements Writable{

	private long upFlow;
	private long downFlow;
	private long sumFlow;
	
	//2  反序列化時,需要反射調用空參構造函數,所以必須有
	public FlowBean() {
		super();
	}

	public FlowBean(long upFlow, long downFlow) {
		super();
		this.upFlow = upFlow;
		this.downFlow = downFlow;
		this.sumFlow = upFlow + downFlow;
	}
	
	//3  寫序列化方法
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeLong(upFlow);
		out.writeLong(downFlow);
		out.writeLong(sumFlow);
	}
	
	//4 反序列化方法
	//5 反序列化方法讀順序必須和寫序列化方法的寫順序必須一致
	@Override
	public void readFields(DataInput in) throws IOException {
		this.upFlow  = in.readLong();
		this.downFlow = in.readLong();
		this.sumFlow = in.readLong();
	}

	// 6 編寫toString方法,方便後續打印到文本
	@Override
	public String toString() {
		return upFlow + "\t" + downFlow + "\t" + sumFlow;
	}

	public long getUpFlow() {
		return upFlow;
	}

	public void setUpFlow(long upFlow) {
		this.upFlow = upFlow;
	}

	public long getDownFlow() {
		return downFlow;
	}

	public void setDownFlow(long downFlow) {
		this.downFlow = downFlow;
	}

	public long getSumFlow() {
		return sumFlow;
	}

	public void setSumFlow(long sumFlow) {
		this.sumFlow = sumFlow;
	}
}/<code>

(2)編寫Mapper類

<code>
		
			junit
			junit
			RELEASE
		
		
			org.apache.logging.log4j
			log4j-core
			2.8.2
		
		
			org.apache.hadoop
			hadoop-common
			2.7.2
		
		
			org.apache.hadoop
			hadoop-client
			2.7.2
		
		
			org.apache.hadoop
			hadoop-hdfs
			2.7.2
		
/<code>

(3)編寫Reducer類

<code>log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n/<code>

(4)編寫Driver驅動類

<code>import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowsumDriver {

	public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
		
// 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設置
args = new String[] { "e:/input/inputflow", "e:/output1" };

		// 1 獲取配置信息,或者job對象實例
		Configuration configuration = new Configuration();
		Job job = Job.getInstance(configuration);

		// 6 指定本程序的jar包所在的本地路徑
		job.setJarByClass(FlowsumDriver.class);

		// 2 指定本業務job要使用的mapper/Reducer業務類
		job.setMapperClass(FlowCountMapper.class);
		job.setReducerClass(FlowCountReducer.class);

		// 3 指定mapper輸出數據的kv類型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(FlowBean.class);

		// 4 指定最終輸出的數據的kv類型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);
		
		// 5 指定job的輸入原始文件所在目錄
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		// 7 將job中配置的相關參數,以及job所用的java類所在的jar包, 提交給yarn去運行
		boolean result = job.waitForCompletion(true);
		System.exit(result ? 0 : 1);
	}
}/<code>

MapReduce框架原理

InputFormat數據輸入

MapReduce知識整理

切片與MapTask並行度決定機制

1.問題引出

MapTask的並行度決定Map階段的任務處理併發度,進而影響到整個Job的處理速度。

思考:1G的數據,啟動8個MapTask,可以提高集群的併發處理能力。那麼1K的數據,也啟動8個MapTask,會提高集群性能嗎?MapTask並行任務是否越多越好呢?哪些因素影響了MapTask並行度?

2.MapTask並行度決定機制

數據塊:Block是HDFS物理上把數據分成一塊一塊。

數據切片:數據切片只是在邏輯上對輸入進行分片,並不會在磁盤上將其切分成片進行存儲。

MapReduce知識整理

Job提交流程源碼和切片源碼詳解

1.Job提交流程源碼詳解

<code>import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordcountMapper extends Mapper{
	
	Text k = new Text();
	IntWritable v = new IntWritable(1);
	
	@Override
	protected void map(LongWritable key, Text value, Context context)	throws IOException, InterruptedException {
		
		// 1 獲取一行
		String line = value.toString();
		
		// 2 切割
		String[] words = line.split(" ");
		
		// 3 輸出
		for (String word : words) {
			
			k.set(word);
			context.write(k, v);
		}
	}
}/<code>
MapReduce知識整理

FileInputFormat切片源碼解析(input.getSplits(job))

(1)程序先找你數據存儲的目錄

(2)開始遍歷處理(規範切片)目錄下的每一個文件

(3)遍歷第一個文件ss.txt

a)獲取文件大小fs.sizeOf(ss.txt)

b)計算切片大小

computeSplitSize(Math.max(Math.mini(maxSize,blocksize)))=blocksize=128M

c)默認情況下,切片大小=blocksize

d)開始切,形成

第一個切片:ss.txt——0:128M

第二個切片ss.txt——128:256M

第三個切片ss.txt——256M:300M

(每次切片時,都要判斷切完剩下的部分是否大於塊的1.1倍,不大於1.1倍就劃分一塊切片)

e)將切片信息寫到一個切片規劃文件中

f)整個切片的核心過程在getSplit()方法中完成

g)InputSplit只記錄了切片的元數據信息,比如起始位置,長度以及所在的節點列表等。

(4)提交切片規劃文件到YARN上,YARN上的MrAppMaster就可以根據切片規劃文件計算開啟MapTask個數。

FileInputFormat切片機制

1.切片機制

(1)簡單地按照文件的內容長度進行切片

(2)切片大小,默認等於Block大小

(3)切片時不考慮數據集整體,而是逐個針對每一個文件單獨切片

2.案例分析

(1)輸入數據有兩個文件:

MapReduce知識整理

(2)經過FIleIputFormat的切片機制運算後,形成的切片信息如下:

MapReduce知識整理

FileInputFormat切片大小的參數配置

(1)源碼中計算切片大小的公式

Math.max(minSize,Math.mini(MaxSize,blockSize));

mapreduce.input.fileinputformat.split.minisize=1 默認值為1

mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue默認值Long.MAXValue。因此,默認情況下,切片大小=blocksize。

(2)切片大小設置

maxsize(切片最大值):參數如果調得比blockSize小,則會讓切片變小,而且就等於配置的這個參數的值。

minsize(切片最小值):參數調的比blockSize大,則可以讓切片變得比blockSize還大。

(2)獲取切片信息API

<code>import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordcountReducer extends Reducer{

int sum;
IntWritable v = new IntWritable();

	@Override
	protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException {
		
		// 1 累加求和
		sum = 0;
		for (IntWritable count : values) {
			sum += count.get();
		}
		
		// 2 輸出
       v.set(sum);
		context.write(key,v);
	}
}/<code>

CombineTextInputFormat切片機制

框架默認的TextInputFormat切片機制是對任務按文件規劃切片,不管文件多小,都會是一個單獨的切片,都會交給一個MapTask,這樣如果有大量小文件,就會產生大量的MapTask,處理效率極其低下。

1、應用場景:

CombineTextInputFormat用於小文件過多的場景,它可以將多個小文件從邏輯上規劃到一個切片中,這樣,多個小文件就可以交給一個MapTask處理。

2、虛擬存儲切片最大值設置

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m

注意:虛擬存儲切片最大值設置最好根據實際的小文件大小情況來設置具體的值。

3、切片機制

生成切片過程包括:虛擬存儲過程和切片過程二部分。

MapReduce知識整理

(1)虛擬存儲過程:

將輸入目錄下所有文件大小,依次和設置的setMaxInputSplitSize值比較,如果不大於設置的最大值,邏輯上劃分一個塊。如果輸入文件大於設置的最大值且大於兩倍,那麼以最大值切割一塊;當剩餘數據大小超過設置的最大值且不大於最大值2倍,此時將文件均分成2個虛擬存儲塊(防止出現太小切片)。

例如setMaxInputSplitSize值為4M,輸入文件大小為8.02M,則先邏輯上分成一個4M。剩餘的大小為4.02M,如果按照4M邏輯劃分,就會出現0.02M的小的虛擬存儲文件,所以將剩餘的4.02M文件切分成(2.01M和2.01M)兩個文件。

(2)切片過程:

(a)判斷虛擬存儲的文件大小是否大於setMaxInputSplitSize值,大於等於則單獨形成一個切片。

(b)如果不大於則跟下一個虛擬存儲文件進行合併,共同形成一個切片。

(c)測試舉例:有4個小文件大小分別為1.7M、5.1M、3.4M以及6.8M這四個小文件,則虛擬存儲之後形成6個文件塊,大小分別為:

1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M)

最終會形成3個切片,大小分別為:(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M

CombineTextInputFormat案例實操

1.需求

將輸入的大量小文件合併成一個切片統一處理。

(1)輸入數據

準備4個小文件

(2)期望

期望一個切片處理4個文件

2.實現過程

(1)不做任何處理,運行1.6節的WordCount案例程序,觀察切片個數為4。


(2)在WordcountDriver中增加如下代碼,運行程序,並觀察運行的切片個數為3。

(a)驅動類中添加代碼如下:

<code>// 如果不設置InputFormat,它默認用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);

//虛擬存儲切片最大值設置4m
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);/<code> 

(b)運行如果為3個切片。

MapReduce知識整理

(3)在WordcountDriver中增加如下代碼,運行程序,並觀察運行的切片個數為1。

(a)驅動中添加代碼如下:

<code>// 如果不設置InputFormat,它默認用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);

//虛擬存儲切片最大值設置20m
CombineTextInputFormat.setMaxInputSplitSize(job, 20971520);/<code>

(b)運行如果為1個切片。

MapReduce知識整理

FileInputFormat實現類

思考:在運行MapReduce程序時,輸入文件格式包括:基於行的日誌文件,二進制格式文件,數據庫表等。那麼,針對不同的數據類型,MapReduce是如何讀取這些數據的呢?

FileInputFormat常見的接口實現類包括:TextInputFormat,KeyValueInputFormat,NLineInputFormat,CombineTextInputFormat和自定義InputFormat等。

1.TextInputFormat

TextInputFormat是默認的FileInputFormat實現類。按行讀取每條記錄。鍵是存儲該行在整個文件中的起始字節偏移量,LongWritable類型。值是這樣的內容,不包括任何行終止符(換行符和回車符),Text類型。

以下是一個示例,比如,一個分片包含了如下4條文本記錄。

MapReduce知識整理

每條記錄表為以下鍵/值對:

MapReduce知識整理

2.KeyValueTextInputFormat

每一行均為一條記錄,被分隔符分割為key,value。可以通過在驅動類中設置

conf.set(Key ValueLineRecordReader.KEY_VALUE_SEPERATOR,"/t");來設定分隔符。

默認分隔符是tab(\t)。

以下是一個示例,輸入是一個包含4條記錄的分片。其中--->表示一個(水平方向的)製表符。

MapReduce知識整理

每條記錄表示為以下鍵/值對:

MapReduce知識整理

此時的鍵是每行排在製表符之前的Text序列。

3.NLineInputFormat

如果使用NlineInputFormat,代表每個map進程處理的InputSplit不再按Block塊去劃分,而是按NlineInputFormat指定的行數N來劃分。即輸入文件的總行數/N=切片數,如果不整除,切片數=商+1。

以下是一個示例,仍然以上面的4行輸入為例。

MapReduce知識整理

例如,如果N是2,則每個輸入分片包含兩行。開啟2個MapTask。

MapReduce知識整理

這裡的鍵和值與TextInputFormat生成的一樣。

KeyValueTextInputFormat使用案例

1.需求

統計輸入文件中每一行的第一個單詞相同的行數。

(1)輸入數據

<code>banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang/<code>

(2)期望結果數據

<code>banzhang	2
xihuan	2/<code>

2.需求分析

MapReduce知識整理

3.代碼實現

(1)編寫Mapper類

<code>import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class KVTextMapper extends Mapper{
	
// 1 設置value
   LongWritable v = new LongWritable(1);  
    
	@Override
	protected void map(Text key, Text value, Context context)
			throws IOException, InterruptedException {

// banzhang ni hao
        
        // 2 寫出
        context.write(key, v);  
	}
}/<code>

(2)編寫Reducer類

<code>import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class KVTextReducer extends Reducer{
	
    LongWritable v = new LongWritable();  
    
	@Override
	protected void reduce(Text key, Iterable values,	Context context) throws IOException, InterruptedException {
		
		 long sum = 0L;  

		 // 1 彙總統計
        for (LongWritable value : values) {  
            sum += value.get();  
        }
         
        v.set(sum);  
         
        // 2 輸出
        context.write(key, v);  
	}
}/<code>

(3)編寫Driver類

<code>import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class KVTextDriver {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		
		Configuration conf = new Configuration();
		// 設置切割符
	conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");
		// 1 獲取job對象
		Job job = Job.getInstance(conf);
		
		// 2 設置jar包位置,關聯mapper和reducer
		job.setJarByClass(KVTextDriver.class);
		job.setMapperClass(KVTextMapper.class);
    job.setReducerClass(KVTextReducer.class);
				
		// 3 設置map輸出kv類型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(LongWritable.class);

		// 4 設置最終輸出kv類型
		job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);
		
		// 5 設置輸入輸出數據路徑
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		
		// 設置輸入格式
	  job.setInputFormatClass(KeyValueTextInputFormat.class);
		
		// 6 設置輸出數據路徑
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		// 7 提交job
		job.waitForCompletion(true);
	}
}/<code> 

NLineInputFormat使用案例

1.需求

對每個單詞進行個數統計,要求根據每個輸入文件的行數來規定輸出多少個切片。此案例要求每三行放入一個切片中。

(1)輸入數據

<code>banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang banzhang ni hao
xihuan hadoop banzhang/<code>

(2)期望輸出數據

<code>Number of splits:4/<code>

2.需求分析

MapReduce知識整理

3.代碼實現

(1)編寫Mapper類

<code>import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class NLineMapper extends Mapper{
	
	private Text k = new Text();
	private LongWritable v = new LongWritable(1);
	
	@Override
	protected void map(LongWritable key, Text value, Context context)	throws IOException, InterruptedException {
		
		 // 1 獲取一行
        String line = value.toString();
        
        // 2 切割
        String[] splited = line.split(" ");
        
        // 3 循環寫出
        for (int i = 0; i < splited.length; i++) {
        	
        	k.set(splited[i]);
        	
           context.write(k, v);
        }
	}
}/<code>

(2)編寫Reducer類

<code>import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class NLineReducer extends Reducer{
	
	LongWritable v = new LongWritable();
	
	@Override
	protected void reduce(Text key, Iterable values,	Context context) throws IOException, InterruptedException {
		
        long sum = 0l;

        // 1 彙總
        for (LongWritable value : values) {
            sum += value.get();
        }  
        
        v.set(sum);
        
        // 2 輸出
        context.write(key, v);
	}
}/<code>

(3)編寫Driver類

<code>import java.io.IOException;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class NLineDriver {
	
	public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
		
// 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設置
args = new String[] { "e:/input/inputword", "e:/output1" };

		 // 1 獲取job對象
		 Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        
        // 7設置每個切片InputSplit中劃分三條記錄
        NLineInputFormat.setNumLinesPerSplit(job, 3);
          
        // 8使用NLineInputFormat處理記錄數  
        job.setInputFormatClass(NLineInputFormat.class);  
          
        // 2設置jar包位置,關聯mapper和reducer
        job.setJarByClass(NLineDriver.class);  
        job.setMapperClass(NLineMapper.class);  
        job.setReducerClass(NLineReducer.class);  
        
        // 3設置map輸出kv類型
        job.setMapOutputKeyClass(Text.class);  
        job.setMapOutputValueClass(LongWritable.class);  
        
        // 4設置最終輸出kv類型
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(LongWritable.class);  
          
        // 5設置輸入輸出數據路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));  
        FileOutputFormat.setOutputPath(job, new Path(args[1]));  
          
        // 6提交job
        job.waitForCompletion(true);  
	}
}/<code>

4.測試

(1)輸入數據

<code>banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang banzhang ni hao
xihuan hadoop banzhang/<code>

(2)輸出結果的切片數

MapReduce知識整理

自定義InputFormat

在企業開發中,Hadoop框架自帶的InputFormat類型不能滿足所有應用場景,需要自定義InputFormat來解決實際問題。

自定義InputFormat步驟如下:

(1)自定義一個類繼承FileInputFormat

(2)改寫RecordReader,實現一個讀取一個完整文件封裝為KV

(3)在輸出時使用SequenceFileOutPutFormat輸出合併文件。

自定義InputFormat案例實操

無論HDFS還是MapReduce,在處理小文件時效率都非常低,但又難免面臨處理大量小文件的場景,此時,就需要有相應解決方案。可以自定義InputFormat實現小文件的合併。

1.需求

將多個小文件合併成一個SequenceFile文件(SequenceFile文件是Hadoop用來存儲二進制形式的key-value對的文件格式),SequenceFile裡面存儲著多個文件,存儲的形式為文件路徑+名稱為key,文件內容為value。

(1)輸入數據

MapReduce知識整理

(2)期望輸出文件格式

MapReduce知識整理

2.需求分析

1.自定義一個類繼承FileInputFormat

(1)重寫isSplitable()方法,返回false不可切割

(2)重寫createRecordReader(),創建自定義的RecordReader對象,並初始化

2.改寫RecordReader,實現一次讀取一個完整文件封裝為KV

(1)採用IO流一次讀取一個文件輸出到value中,因為設置了不可切片,最終把所有文件封裝了value中。

(2)獲取文件路徑信息+名稱,並設置key

3.設置Driver

MapReduce知識整理

3.程序實現

(1)自定義InputFromat

<code>import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

// 定義類繼承FileInputFormat
public class WholeFileInputformat extends FileInputFormat{
	
	@Override
	protected boolean isSplitable(JobContext context, Path filename) {
		return false;
	}

	@Override
	public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context)	throws IOException, InterruptedException {
		
		WholeRecordReader recordReader = new WholeRecordReader();
		recordReader.initialize(split, context);
		
		return recordReader;
	}
}/<code>

(2)自定義RecordReader類

<code>import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class WholeRecordReader extends RecordReader{

	private Configuration configuration;
	private FileSplit split;
	
	private boolean isProgress= true;
	private BytesWritable value = new BytesWritable();
	private Text k = new Text();

	@Override
	public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
		
		this.split = (FileSplit)split;
		configuration = context.getConfiguration();
	}

	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		
		if (isProgress) {

			// 1 定義緩存區
			byte[] contents = new byte[(int)split.getLength()];
			
			FileSystem fs = null;
			FSDataInputStream fis = null;
			
			try {
				// 2 獲取文件系統
				Path path = split.getPath();
				fs = path.getFileSystem(configuration);
				
				// 3 讀取數據
				fis = fs.open(path);
				
				// 4 讀取文件內容
				IOUtils.readFully(fis, contents, 0, contents.length);
				
				// 5 輸出文件內容
				value.set(contents, 0, contents.length);

       // 6 獲取文件路徑及名稱
       String name = split.getPath().toString();

       // 7 設置輸出的key值
       k.set(name);

			} catch (Exception e) {
				
			}finally {
				IOUtils.closeStream(fis);
			}
			
			isProgress = false;
			
			return true;
		}
		
		return false;
	}

	@Override
	public Text getCurrentKey() throws IOException, InterruptedException {
		return k;
	}

	@Override
	public BytesWritable getCurrentValue() throws IOException, InterruptedException {
		return value;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		return 0;
	}

	@Override
	public void close() throws IOException {
	}
}/<code>

(3)編寫SequenceFileMapper類處理流程

<code>import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class SequenceFileMapper extends Mapper{
	
	@Override
	protected void map(Text key, BytesWritable value,			Context context)		throws IOException, InterruptedException {

		context.write(key, value);
	}
}/<code>

(4)編寫SequenceFileReducer類處理流程

<code>import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SequenceFileReducer extends Reducer {

	@Override
	protected void reduce(Text key, Iterable values, Context context)		throws IOException, InterruptedException {

		context.write(key, values.iterator().next());
	}
}/<code>

(5)編寫SequenceFileDriver類處理流程

<code>package com.atguigu.mapreduce.inputformat;
import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SequenceFileReducer extends Reducer {

	@Override
	protected void reduce(Text key, Iterable values, Context context)		throws IOException, InterruptedException {

		context.write(key, values.iterator().next());
	}
}
(5)編寫SequenceFileDriver類處理流程
package com.atguigu.mapreduce.inputformat;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

public class SequenceFileDriver {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		
       // 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設置
		args = new String[] { "e:/input/inputinputformat", "e:/output1" };

       // 1 獲取job對象
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

       // 2 設置jar包存儲位置、關聯自定義的mapper和reducer
		job.setJarByClass(SequenceFileDriver.class);
		job.setMapperClass(SequenceFileMapper.class);
		job.setReducerClass(SequenceFileReducer.class);

       // 7設置輸入的inputFormat
		job.setInputFormatClass(WholeFileInputformat.class);

       // 8設置輸出的outputFormat
	 job.setOutputFormatClass(SequenceFileOutputFormat.class);
       
// 3 設置map輸出端的kv類型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(BytesWritable.class);
		
       // 4 設置最終輸出端的kv類型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(BytesWritable.class);

       // 5 設置輸入輸出路徑
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

       // 6 提交job
		boolean result = job.waitForCompletion(true);
		System.exit(result ? 0 : 1);
	}
}
3.2 MapReduce工作流程/<code>

MapReduce工作流程

1.流程示意圖

MapReduce知識整理

MapReduce知識整理

2.流程詳解

上面的流程是整個MapReduce最全工作流程,但是Shuffle過程只是從第7步開始到第16步結束,具體Shuffle過程詳解,如下:

1)MapTask收集我們的map()方法輸出的kv對,放到內存緩衝區中

2)從內存緩衝區不斷溢出本地磁盤文件,可能會溢出多個文件

3)多個溢出文件會被合併成大的溢出文件

4)在溢出過程及合併的過程中,都要調用Partitioner進行分區和針對key進行排序

5)ReduceTask根據自己的分區號,去各個MapTask機器上取相應的結果分區數據

6)ReduceTask會取到同一個分區的來自不同MapTask的結果文件,ReduceTask會將這些文件再進行合併(歸併排序)

7)合併成大文件後,Shuffle的過程也就結束了,後面進入ReduceTask的邏輯運算過程(從文件中取出一個一個的鍵值對Group,調用用戶自定義的reduce()方法)

3.注意

Shuffle中的緩衝區大小會影響到MapReduce程序的執行效率,原則上說,緩衝區越大,磁盤io的次數越少,執行速度就越快。

緩衝區的大小可以通過參數調整,參數:io.sort.mb默認100M。

4.源碼解析流程

<code>context.write(k, NullWritable.get());
output.write(key, value);
collector.collect(key, value,partitioner.getPartition(key, value, partitions));
	HashPartitioner();
collect()
	close()
	collect.flush()
sortAndSpill()
	sort()   QuickSort
mergeParts();
	//file.out
 //file.out.index
collector.close();/<code>

Shuffle機制

Map方法之後,Reduce方法之前的數據處理過程稱之為Shuffle。

MapReduce知識整理

MapTask工作機制

MapReduce知識整理

1)Read階段:MapTask通過用戶編寫的RecordReader,從輸入InputSplit中解析出一個個key/value。

(2)Map階段:該節點主要是將解析出的key/value交給用戶編寫map()函數處理,併產生一系列新的key/value。

(3)Collect收集階段:在用戶編寫map()函數中,當數據處理完成後,一般會調用OutputCollector.collect()輸出結果。在該函數內部,它會將生成的key/value分區(調用Partitioner),並寫入一個環形內存緩衝區中。

(4)Spill階段:即“溢寫”,當環形緩衝區滿後,MapReduce會將數據寫到本地磁盤上,生成一個臨時文件。需要注意的是,將數據寫入本地磁盤之前,先要對數據進行一次本地排序,並在必要時對數據進行合併、壓縮等操作。

溢寫階段詳情:

步驟1:利用快速排序算法對緩存區內的數據進行排序,排序方式是,先按照分區編號Partition進行排序,然後按照key進行排序。這樣,經過排序後,數據以分區為單位聚集在一起,且同一分區內所有數據按照key有序。

步驟2:按照分區編號由小到大依次將每個分區中的數據寫入任務工作目錄下的臨時文件output/spillN.out(N表示當前溢寫次數)中。如果用戶設置了Combiner,則寫入文件之前,對每個分區中的數據進行一次聚集操作。

步驟3:將分區數據的元信息寫到內存索引數據結構SpillRecord中,其中每個分區的元信息包括在臨時文件中的偏移量、壓縮前數據大小和壓縮後數據大小。如果當前內存索引大小超過1MB,則將內存索引寫到文件output/spillN.out.index中。

(5)Combine階段:當所有數據處理完成後,MapTask對所有臨時文件進行一次合併,以確保最終只會生成一個數據文件。

當所有數據處理完後,MapTask會將所有臨時文件合併成一個大文件,並保存到文件output/file.out中,同時生成相應的索引文件output/file.out.index。

在進行文件合併過程中,MapTask以分區為單位進行合併。對於某個分區,它將採用多輪遞歸合併的方式。每輪合併io.sort.factor(默認10)個文件,並將產生的文件重新加入待合併列表中,對文件排序後,重複以上過程,直到最終得到一個大文件。

讓每個MapTask最終只生成一個數據文件,可避免同時打開大量文件和同時讀取大量小文件產生的隨機讀取帶來的開銷。

ReduceTask工作機制

1.ReduceTask工作機制

MapReduce知識整理

(1)Copy階段:ReduceTask從各個MapTask上遠程拷貝一片數據,並針對某一片數據,如果其大小超過一定閾值,則寫到磁盤上,否則直接放到內存中。

(2)Merge階段:在遠程拷貝數據的同時,ReduceTask啟動了兩個後臺線程對內存和磁盤上的文件進行合併,以防止內存使用過多或磁盤上文件過多。

(3)Sort階段:按照MapReduce語義,用戶編寫reduce()函數輸入數據是按key進行聚集的一組數據。為了將key相同的數據聚在一起,Hadoop採用了基於排序的策略。由於各個MapTask已經實現對自己的處理結果進行了局部排序,因此,ReduceTask只需對所有數據進行一次歸併排序即可。

(4)Reduce階段:reduce()函數將計算結果寫到HDFS上。

2.設置ReduceTask並行度(個數)

ReduceTask的並行度同樣影響整個Job的執行併發度和執行效率,但與MapTask的併發數由切片數決定不同,ReduceTask數量的決定是可以直接手動設置:

MapReduce知識整理

3.實驗:測試ReduceTask多少合適

1)實驗環境:1個Master節點,16個Slave節點:CPU:8GHZ,內存: 2G

(2)實驗結論:

MapReduce知識整理

4.注意事項

(1)如果ReduceTask=0,表示沒有Reduce階段,輸出文件個數和Map個數一致。

(2)ReduceTask默認值就是1,所以輸出文件個數為一個。

(3)如果數據分佈不均勻,就有可能在Reduce階段產生數據傾斜。

(4)ReduceTask數量並不是任意設置,還要考慮業務邏輯需求,有些情況下,需要計算全局彙總結果,就只能有1個ReduceTask。

(5)具體多少個ReduceTask,需要根據集群性能而定。

(6)如果分區數不是1,但是ReduceTask為1,是否執行分區過程。答案是:不執行分區過程。因為在MapTask的源碼中,執行分區的前提是先判斷ReduceNum個數是否大於1。大於1肯定不執行。


分享到:


相關文章: