Mapredce定義:
MapReduce是一個分佈式運算程序的編程框架,是用戶開發(基於Hadoop的數據分析應用)的核心框架。
MapReduce的核心功能是將用戶編寫的業務邏輯代碼與自帶的默認組件整合成一個完整的分佈式運算程序,併發運行在一個Hadoop集群上。
MapReduce優缺點
1、優點
1.1、易於編程
1.2、良好的擴張性
1.3、高容錯
- 其中一臺機器掛掉,它上面運行的計算任務會自動轉移到另外的機器上運行,不需要人工干預
1.4、適用PB級以上海量數據的離線處理
2、缺點
2.1、不擅長實時計算
2.2、不擅長流式計算
2.3、不擅長DAG(有向圖)計算
- 多個應用存在依賴關係,後一個應用的輸入是前一個應用的輸出,在這種情況下,MapReduce不是不能做,而是每個MapReduce的輸出結果都會 寫入到磁盤,會造成大量的磁盤IO,性能低下
MapReduce 進程
- MrAppMaster:負責整個程序的過程調度和狀態協調
- MapTask:負責Map階段的數據處理流程
- ReduceTask:負責Reduce階段數據處理流程
常用的數據序列化類型
Java類型 Hadoop Writable類型
boolean BooleanWritable
byte ByteWritable
int IntWritable
float FloatWritable
long LongWritable
double DoubleWritable
String Text
map MapWritable
array ArrayWritable
MapReduce 編程規範
- Mapper
- 用戶自定義的Mapper 繼承父類,重寫父類的map() 方法
- Mapper 中業務邏輯寫在map() 方法內
- Mapper 的輸入類型和輸出類型都是 K V對的形式,
- Reduce
- 用戶自定義的Reduce 繼承父類,重寫父類的reduce() 方法
- Reduce 業務邏輯寫在reduce() 方法內
- Reduce的輸入類型,輸出類型都是K V對的形式
- Reduce對每組k 調用一次reduce() 方法
- Driver
- 相當於yarn集群的客戶端,用於提交整個程序到yarn 集群,提交時封裝了MapReduce程序相關運行參數的job對象
wordCount演練
- 需求:統計文件中每個單詞出現的次數,文件存儲在HDFS中,路徑和內容如下:
<code>[root@bbx hadoop-3.1.3]# bin/hdfs dfs -ls -R /home/input/ -rw-r--r-- 1 root supergroup 30 2020-05-02 17:05 /home/input/name [root@bbx hadoop-3.1.3]# bin/hdfs dfs -cat /home/input/name 2020-05-04 16:47:12,295 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false aa bb ss aa cc dd bb cc dd ee/<code>
- maven依賴
<code> 4.0.0 org.springframework.boot spring-boot-starter-parent 2.2.6.RELEASE com.bbx wcdemo 0.0.1-SNAPSHOT wcdemo Demo project for Spring Boot 1.8 org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-logging org.springframework.boot spring-boot-starter-test test org.junit.vintage junit-vintage-engine org.apache.hadoop hadoop-common 3.1.3 slf4j-log4j12 org.slf4j com.google.code.gson gson org.apache.hadoop hadoop-client 3.1.3 org.apache.hadoop hadoop-hdfs 3.1.3 slf4j-log4j12 org.slf4j maven-compiler-plugin 1.8 1.8 maven-assembly-plugin jar-with-dependencies com.bbx.wcdemo.WcdemoApplication make-assembly package single /<code>
- 自定義mapper
<code>package com.bbx.wcdemo; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WCMapper extends Mapper { Text text = new Text(); IntWritable intWritable = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //按行將內容讀取進來 String[] values = value.toString().split(" "); for(String v:values){ text.set(v); //拆分後按照 key,value 寫出 如 bbx:1,多個不會合並 context.write(text,intWritable); } } } /<code>
- 自定義reduce
<code>package com.bbx.wcdemo; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WCReduce extends Reducer { IntWritable intWritable =new IntWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; //按照字典順序key分組,加載每一組key,key組內的value值求和 for(IntWritable value:values){ sum += value.get(); } intWritable.set(sum); context.write(key,intWritable); } } /<code>
- Driver 驅動
<code>package com.bbx.wcdemo; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WcdemoApplication { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(WcdemoApplication.class); job.setMapperClass(WCMapper.class); job.setReducerClass(WCReduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.waitForCompletion(true); } } /<code>
- 集群環境運行----(輸出路徑不能存在)
<code>hadoop jar wcdemo-0.0.1-SNAPSHOT.jar com.bbx.wcdemo.WcdemoApplication /home/input/ /home/output/<code>