大數據 Hadoop(下) 筆記大全 收藏加關注


大數據 Hadoop(下) 筆記大全 收藏加關注


Combiner組件

(1)Combiner 是MR 程序中Mapper和Reducer 之外的一種組件

(2)Combiner的組件的父類就是Reducer

(3)Combiner 和Reducer的區別在於運行位置

<code>Combiner  是在每一個MapTask節點上 局部彙總
Reducer 是接收全局的Mapper的輸出結果/<code>

(4)Combiner的意義就是對一個每一個MapTask的輸出做局部彙總以減少網絡傳輸量

(5)Combiner應用的前提是不能影響最終的業務結果 而且 Combiner輸出KV 應該和Reducer 的輸入KV相對應


應用場景

Combiner 並不適用與所有的業務

<code>1. Combiner 適合進行累加的業務
2. 不適用avg()(求平均值)為什麼?
例如:求0 ,20 ,10 ,25 ,15 的平均數 直接全部加起來除5等於 14 (全局彙總求平均數)
{(0,20,10)/3 = 10 第一次局部彙總計算 } {(25+15)/2 = 20 第二次局部彙總計算} {(10+20)/2 = 15 全局計算}

根據上述計算直接進行全局運算是不影響最終結果的,但是如果局部運算後 平均數的結果會受到影響
所以Combiner 不適合求平均數/<code>

使用

(1)新建 CombinerClass 集成Reducer

(2)在驅動類(Job類)中使用job對象設置CombinerClass job.setCombinerClass();

案例

(1)WC計數案例

(2)job.setCombinerClass();

在沒有設置CombinerClass之前:


在設置CombinerClass之後


MapReduce過程

首先由客戶端節點提交Job InputFomat首先會做數據的切片,由InputFomat組件提供數據給Map函數 Map處理完成後 會交由Reduce函數進行處理,最終輸出。


Shuffle

  • InputFormat功能
  • 如何切片?
  • 如何讀取數據
  • Map函數是用戶自定義的規,進行數據的處理、
  • 為了讓Reduce 可以並行處理Map的結果 需要對Map的輸出進行一定的分區(partition),排序(Sort),合併(Combine),歸併(merge)等操作 得到 的中間結果 再交給對應的Reduce 進行處理 。這個過程稱之為Shuffle。 從無序的 到有序的 ,這個過程稱之為洗牌非常形象。
  • Reduce 拿到了 中間結果作為輸入 使用用戶自定義的規則進行處理 輸出結果交給OutputFormat
  • OutputFormat


Shuffle 是MR 的核心,描述數據從MapTask輸出到作為ReduceTask的輸入的這段過程。

Hadoop的大數據集群運行,在真正的生產計算中,大部分MapTask 和ReduceTask 都不在同一個節點上,Reduce就要去其他節點上取中間結果(通過網絡)。那麼集群運行多個Job時,ReduceTask的正常執行或許會嚴重消耗集群中的網絡資源。雖說這種消耗是正常的,是不可避免,但是我們可以採取措施儘量就減少網絡傳輸(可以使用Combiner)。

另外一個點:相比較於,磁盤IO對於Job的運行效率影響很大。

所以 從以上分析 Shuffle過程的基本要求:

  • 完整的從MapTask端拉取數據到ReduceTask端
  • 在拉取數據過程中,儘量減少網絡資源的消耗(人為)
  • 儘可能較少磁盤IO對Task執行的影響


總結:Shuffle 是對Map輸出結果的分區(partition),排序(Sort),合併(Combine),歸併(merge)等操作 交由Reduce的過程

Map 端的Shuffle

(1)map結果寫入緩衝區

(2)緩衝區達到閾值 溢寫到磁盤

(3)分區內排序合併歸併成大文件(key,value-list)


Reduce 端的Shuffle

(1)領取數據

(2)歸併數據

(3)數據輸入給Reduce任務



編程案例


排序輸出

數據

<code>15117826008 83 218 301 zz
15617126008 83 218 302 zz
16519756181 150 164 315 tj
16519756281 150 164 314 tj
18611781131 123 112 237 sh
18611781113 123 112 235 sh
18711671123 45 186 232 bj
18711671133 45 186 233 bj
18711671233 45 186 234 tw
18711631233 45 186 235 xy/<code>


<code>package com.baizhi.test07;

import com.baizhi.test06.OwnPartitioner;
import com.baizhi.test06.PJob;
import com.baizhi.test06.PMapper;
import com.baizhi.test06.PReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class SortJob {

public static void main(String[] args) throws Exception {

//System.setProperty("HADOOP_USER_NAME", "root");

Configuration conf = new Configuration();
//
// conf.addResource("conf2/core-site.xml");
// conf.addResource("conf2/hdfs-site.xml");
// conf.addResource("conf2/mapred-site.xml");
// conf.addResource("conf2/yarn-site.xml");
// conf.set("mapreduce.app-submission.cross-platform", "true");
//
// conf.set(MRJobConfig.JAR, "F:\\\\大數據\\\\代碼\\\\BigData\\\\Hadoop_Test\\\\target\\\\Hadoop_Test-1.0-SNAPSHOT.jar");
//

Job job = Job.getInstance(conf);

/*
* 設置類加載器
* */
job.setJarByClass(SortJob.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
/*
* 使用自定義的分區規則
* */
// job.setPartitionerClass(OwnPartitioner.class);


// job.setNumReduceTasks(1);


TextInputFormat.setInputPaths(job, new Path("F:\\\\大數據\\\\筆記\\\\Day02-Hadoop\\\\數據文件\\\\sumFlow.txt"));
//TextInputFormat.setInputPaths(job, new Path("/flow.txt"));
TextOutputFormat.setOutputPath(job, new Path("F:\\\\大數據\\\\筆記\\\\Day02-Hadoop\\\\數據文件\\\\outS111"));
//TextOutputFormat.setOutputPath(job, new Path("/out111111"));


job.setMapperClass(SortMapper.class);
job.setReducerClass(SortReducer.class);


job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(NullWritable.class);


job.setOutputKeyClass(FlowBean.class);
job.setOutputValueClass(NullWritable.class);

job.waitForCompletion(true);


}
}
​/<code>
<code>package com.baizhi.test07;

import com.sun.org.apache.bcel.internal.generic.FLOAD;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class SortMapper extends Mapper<longwritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 拿到數據 使用空格進行分割
String[] flowinfos = value.toString().split(" ");

/*
* 拿到下標為0的電話
* */
String phone = flowinfos[0];
/*
* 拿到下標為1的上傳流量
* */
Long upFlow = Long.valueOf(flowinfos[1]);

/*
* 拿到下標為2的下載流量
* */
Long downFlow = Long.valueOf(flowinfos[2]);
Long sumFlow = Long.valueOf(flowinfos[3]);



/*
* 準備bean對象
* */

FlowBean flowBean = new FlowBean();

/*
* */
flowBean.setPhone(phone);
flowBean.setUpFlow(upFlow);
flowBean.setDownFlow(downFlow);
/*
* 設置總流量 上傳+下載
* */
flowBean.setSumFlow(sumFlow);

/*
* 數據寫出
* */
context.write(flowBean, NullWritable.get());


}
}
​/<longwritable>/<code>
<code>package com.baizhi.test07;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SortReducer extends Reducer<flowbean> {
@Override
protected void reduce(FlowBean key, Iterable<nullwritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
​/<nullwritable>/<flowbean>/<code>


<code>package com.baizhi.test07;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements WritableComparable<flowbean> {

private String phone;
private Long upFlow;
private Long downFlow;
private Long sumFlow;

public FlowBean() {
}

public FlowBean(String phone, Long upFlow, Long downFlow, Long sumFlow) {
this.phone = phone;
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = sumFlow;
}

public String getPhone() {
return phone;
}

public void setPhone(String phone) {
this.phone = phone;
}

public Long getUpFlow() {
return upFlow;
}

public void setUpFlow(Long upFlow) {
this.upFlow = upFlow;
}

public Long getDownFlow() {
return downFlow;
}

public void setDownFlow(Long downFlow) {
this.downFlow = downFlow;
}

public Long getSumFlow() {
return sumFlow;
}

public void setSumFlow(Long sumFlow) {
this.sumFlow = sumFlow;
}

@Override

public String toString() {
return "FlowBean{" +
"phone='" + phone + '\\'' +
", upFlow=" + upFlow +
", downFlow=" + downFlow +
", sumFlow=" + sumFlow +
'}';
}

/*
* 往外寫 序列化 編碼
* */
public void write(DataOutput dataOutput) throws IOException {

dataOutput.writeUTF(this.phone);
dataOutput.writeLong(this.upFlow);
dataOutput.writeLong(this.downFlow);
dataOutput.writeLong(this.sumFlow);
}


/*
* 往裡讀 反序列化 解碼
* */
public void readFields(DataInput dataInput) throws IOException {

this.phone = dataInput.readUTF();
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.sumFlow = dataInput.readLong();
}

public int compareTo(FlowBean flowBean) {

if (this.sumFlow > flowBean.sumFlow) {

return 1;
} else if (this.sumFlow == flowBean.sumFlow) {

return 0;
} else {
return -1;
}

// return this.sumFlow > flowBean.sumFlow ? -1 : 1;
}
}
​/<flowbean>/<code>


排序分區輸出

數據

<code>15117826008 83 218 301 zz
15617126008 83 218 302 zz
16519756181 150 164 315 tj
16519756281 150 164 314 tj
18611781131 123 112 237 sh
18611781113 123 112 235 sh
18711671123 45 186 232 bj
18711671133 45 186 233 bj
18711671233 45 186 234 tw
18711631233 45 186 235 xy/<code>


大數據 Hadoop(下) 筆記大全 收藏加關注


<code>package com.baizhi.test08;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements WritableComparable<flowbean> {

private String phone;
private Long upFlow;
private Long downFlow;
private Long sumFlow;

public FlowBean() {
}

public FlowBean(String phone, Long upFlow, Long downFlow, Long sumFlow) {
this.phone = phone;
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = sumFlow;
}

public String getPhone() {
return phone;
}

public void setPhone(String phone) {
this.phone = phone;
}

public Long getUpFlow() {
return upFlow;
}

public void setUpFlow(Long upFlow) {
this.upFlow = upFlow;
}

public Long getDownFlow() {
return downFlow;
}

public void setDownFlow(Long downFlow) {
this.downFlow = downFlow;
}

public Long getSumFlow() {
return sumFlow;
}

public void setSumFlow(Long sumFlow) {
this.sumFlow = sumFlow;
}

@Override
public String toString() {
return "FlowBean{" +
"phone='" + phone + '\\'' +
", upFlow=" + upFlow +
", downFlow=" + downFlow +
", sumFlow=" + sumFlow +
'}';
}

/*
* 往外寫 序列化 編碼
* */
public void write(DataOutput dataOutput) throws IOException {

dataOutput.writeUTF(this.phone);
dataOutput.writeLong(this.upFlow);
dataOutput.writeLong(this.downFlow);
dataOutput.writeLong(this.sumFlow);
}


/*
* 往裡讀 反序列化 解碼
* */
public void readFields(DataInput dataInput) throws IOException {

this.phone = dataInput.readUTF();
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.sumFlow = dataInput.readLong();
}

public int compareTo(FlowBean flowBean) {

if (this.sumFlow > flowBean.sumFlow) {

return -1;
} else if (this.sumFlow == flowBean.sumFlow) {

return 0;
} else {
return 1;
}

// return this.sumFlow > flowBean.sumFlow ? -1 : 1;
}
}
​/<flowbean>/<code>
<code>package com.baizhi.test08;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

import java.util.HashMap;

public class OwnPartitioner extends Partitioner<flowbean> {


private static HashMap<string> map = new HashMap<string>();

static {

map.put("zz", 0);
map.put("bj", 1);
map.put("tj", 2);
map.put("sh", 3);

}


public int getPartition(FlowBean key, Text value, int i) {

String areaName = value.toString();

Integer num = map.get(areaName);

/*
* 如果num 沒有值 為空則返回4 不為空則返回正常的值
* */
return num == null ? 4 : num;

}
}

​/<string>/<string>/<flowbean>/<code>
<code>package com.baizhi.test08;


import com.baizhi.test07.SortJob;
import com.baizhi.test07.SortMapper;
import com.baizhi.test07.SortReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class SPJob {
public static void main(String[] args) throws Exception {

//System.setProperty("HADOOP_USER_NAME", "root");

Configuration conf = new Configuration();
//
// conf.addResource("conf2/core-site.xml");
// conf.addResource("conf2/hdfs-site.xml");
// conf.addResource("conf2/mapred-site.xml");
// conf.addResource("conf2/yarn-site.xml");
// conf.set("mapreduce.app-submission.cross-platform", "true");
//
// conf.set(MRJobConfig.JAR, "F:\\\\大數據\\\\代碼\\\\BigData\\\\Hadoop_Test\\\\target\\\\Hadoop_Test-1.0-SNAPSHOT.jar");
//

Job job = Job.getInstance(conf);

/*
* 設置類加載器
* */
job.setJarByClass(SPJob.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
/*
* 使用自定義的分區規則
* */
job.setPartitionerClass(OwnPartitioner.class);


job.setNumReduceTasks(5);



TextInputFormat.setInputPaths(job, new Path("F:\\\\大數據\\\\筆記\\\\Day02-Hadoop\\\\數據文件\\\\sumFlow.txt"));
//TextInputFormat.setInputPaths(job, new Path("/flow.txt"));
TextOutputFormat.setOutputPath(job, new Path("F:\\\\大數據\\\\筆記\\\\Day02-Hadoop\\\\數據文件\\\\outSP111"));
//TextOutputFormat.setOutputPath(job, new Path("/out111111"));


job.setMapperClass(SPMapper.class);
//job.setReducerClass(SortReducer.class);


job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);

//job.setOutputKeyClass(FlowBean.class);
//job.setOutputValueClass(Text.class);

job.waitForCompletion(true);


}
}
​/<code>
<code>package com.baizhi.test08;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/*
*
* 在valkue中存儲地區信息
* */
public class SPMapper extends Mapper<longwritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

// 拿到數據 使用空格進行分割
String[] flowinfos = value.toString().split(" ");

/*
* 拿到下標為0的電話

* */
String phone = flowinfos[0];
/*
* 拿到下標為1的上傳流量
* */
Long upFlow = Long.valueOf(flowinfos[1]);

/*
* 拿到下標為2的下載流量
* */
Long downFlow = Long.valueOf(flowinfos[2]);
Long sumFlow = Long.valueOf(flowinfos[3]);
/*
* 拿到地區信息
* */
String areaName = flowinfos[4];




/*
* 準備bean對象
* */
FlowBean flowBean = new FlowBean();

/*
* */
flowBean.setPhone(phone);
flowBean.setUpFlow(upFlow);
flowBean.setDownFlow(downFlow);
/*
* 設置總流量 上傳+下載
* */
flowBean.setSumFlow(sumFlow);

/*
* 數據寫出
* */
context.write(flowBean, new Text(areaName));


}
}
​/<longwritable>/<code>


成績合併

學生信息

<code>gjf 00001
gzy 00002
jzz 00003
zkf 00004/<code>

學生課程信息

<code>00001 yuwen
00001 shuxue
00002 yinyue
00002 yuwen
00003 tiyu
00003 shengwu
00004 tiyu
00004 wuli/<code>

期望輸出結果:

<code>00001 gjf yuwem shuxue/<code>
<code>package com.baizhi.test09;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class MJob {
public static void main(String[] args) throws Exception {

//System.setProperty("HADOOP_USER_NAME", "root");

Configuration conf = new Configuration();
//
// conf.addResource("conf2/core-site.xml");
// conf.addResource("conf2/hdfs-site.xml");
// conf.addResource("conf2/mapred-site.xml");
// conf.addResource("conf2/yarn-site.xml");
// conf.set("mapreduce.app-submission.cross-platform", "true");

//
// conf.set(MRJobConfig.JAR, "F:\\\\大數據\\\\代碼\\\\BigData\\\\Hadoop_Test\\\\target\\\\Hadoop_Test-1.0-SNAPSHOT.jar");
//

Job job = Job.getInstance(conf);

/*
* 設置類加載器
* */
job.setJarByClass(MJob.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
/*
* 使用自定義的分區規則
* */
// job.setPartitionerClass(OwnPartitioner.class);


// job.setNumReduceTasks(5);


TextInputFormat.setInputPaths(job, new Path("F:\\\\大數據\\\\代碼\\\\BigData\\\\Hadoop_Test\\\\src\\\\main\\\\java\\\\com\\\\baizhi\\\\test09\\\\in"));
//TextInputFormat.setInputPaths(job, new Path("/flow.txt"));
TextOutputFormat.setOutputPath(job, new Path("F:\\\\大數據\\\\代碼\\\\BigData\\\\Hadoop_Test\\\\src\\\\main\\\\java\\\\com\\\\baizhi\\\\test09\\\\out11"));
//TextOutputFormat.setOutputPath(job, new Path("/out111111"));


job.setMapperClass(MMapper.class);
job.setReducerClass(MReducer.class);


job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.waitForCompletion(true);


}
}
​/<code>


大數據 Hadoop(下) 筆記大全 收藏加關注


<code>package com.baizhi.test09;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class MMapper extends Mapper<longwritable> {

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
/*
* 通過context 獲得當前讀取的文件

* */
FileSplit fileSplit = (FileSplit) context.getInputSplit();
/*
* 獲取文件路徑
* */
Path path = fileSplit.getPath();


/*
* 當前行使用空格進行分割
* */
String[] infos = value.toString().split(" ");

/*準備存儲key的變量
* */
String mapkey = "";

/*
* 準備存儲value的變量
* */
String mapValue = "";

/*
* 判斷當前行來自哪個文件
* */

if (path.toString().contains("student_info.txt")) {

/*
* 名字
* 加上a 代表是名字
* */
mapValue = infos[0] + " a";

/*
* 學號
* */
mapkey = infos[1];

} else {
/*
*學科
* 加上b 代表是學科
* */

mapValue = infos[1] + " b";


/*
* 學號
* */
mapkey = infos[0];


/*(00001,gjf)
(00001,yuwen)
(00001,shuxue)
*/

}


/*
* 輸出學號 key | 輸出可能是name或者學科 value
* */
context.write(new Text(mapkey), new Text(mapValue));

}
}
​/<longwritable>/<code>


<code>package com.baizhi.test09;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;

public class MReducer extends Reducer<text> {

/* (00001,gjf a)
(00001,yuwen b)
(00001,shuxue b)
*/

@Override
protected void reduce(Text key, Iterable<text> values, Context context) throws IOException, InterruptedException {
/*
* 準備存放名字的變量

* */
String name = "";

/*
* 準備存放 學科的集合
* */
ArrayList<string> classList = new ArrayList<string>();


/*
* 遍歷當前學號的下的所有信息
* */
for (Text value : values) {

/*
* 使用 空格進行分割
* */
String[] infos = value.toString().split(" ");
/*
* 拿到flag 標識 表示是學科還是姓名
* */
String flag = infos[1];


/*
* 如果為a 則為姓名
* */
if (flag.contains("a")) {

/*
* 拿到name的值 複製給name
* */
name = infos[0];

/*
* 其他則為 學科
* */
} else {
/*
* 拿到學科的值 封裝在list中
* */
classList.add(infos[0]);


}


}


/*
* 將集合中的值 取出 放在 字符串中
* */
String clasLine = "";
for (String s : classList) {
clasLine += s + " ";
}

/*
* 最終進行輸出
* */
context.write(new Text(key.toString().trim()), new Text(name + " " + clasLine.trim()));
}
}
​/<string>/<string>/<text>/<text>/<code>


MR優化策略

(1)干預切片計算邏輯-CombineTextInputFormat(小文件優化)

<code>CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4M/<code>

(2)實現Partitioner策略防止數據傾斜,實現Reduce Task負載均衡

(3)適當調整YranChild內存參數,需要查閱Yarn參數配置手冊,一般調整vcores和內存參數CPU使用

(4)適當調整溢寫緩衝區大小和閾值

(5)適當調整合並文件並行度mapreduce.task.io.sort.factor=10'

(6)對Map端輸出溢寫文件使用gzip壓縮,節省網絡帶寬

<code>conf.setBoolean("mapreduce.map.output.compress", true);
conf.setClass("mapreduce.map.output.compress.codec",
GzipCodec.class, CompressionCodec.class);/<code>


大數據 Hadoop(下) 筆記大全 收藏加關注



分享到:


相關文章: