大数据 Hadoop(下) 笔记大全 收藏加关注


大数据 Hadoop(下) 笔记大全 收藏加关注


Combiner组件

(1)Combiner 是MR 程序中Mapper和Reducer 之外的一种组件

(2)Combiner的组件的父类就是Reducer

(3)Combiner 和Reducer的区别在于运行位置

<code>Combiner  是在每一个MapTask节点上 局部汇总
Reducer 是接收全局的Mapper的输出结果/<code>

(4)Combiner的意义就是对一个每一个MapTask的输出做局部汇总以减少网络传输量

(5)Combiner应用的前提是不能影响最终的业务结果 而且 Combiner输出KV 应该和Reducer 的输入KV相对应


应用场景

Combiner 并不适用与所有的业务

<code>1. Combiner 适合进行累加的业务
2. 不适用avg()(求平均值)为什么?
例如:求0 ,20 ,10 ,25 ,15 的平均数 直接全部加起来除5等于 14 (全局汇总求平均数)
{(0,20,10)/3 = 10 第一次局部汇总计算 } {(25+15)/2 = 20 第二次局部汇总计算} {(10+20)/2 = 15 全局计算}

根据上述计算直接进行全局运算是不影响最终结果的,但是如果局部运算后 平均数的结果会受到影响
所以Combiner 不适合求平均数/<code>

使用

(1)新建 CombinerClass 集成Reducer

(2)在驱动类(Job类)中使用job对象设置CombinerClass job.setCombinerClass();

案例

(1)WC计数案例

(2)job.setCombinerClass();

在没有设置CombinerClass之前:


在设置CombinerClass之后


MapReduce过程

首先由客户端节点提交Job InputFomat首先会做数据的切片,由InputFomat组件提供数据给Map函数 Map处理完成后 会交由Reduce函数进行处理,最终输出。


Shuffle

  • InputFormat功能
  • 如何切片?
  • 如何读取数据
  • Map函数是用户自定义的规,进行数据的处理、
  • 为了让Reduce 可以并行处理Map的结果 需要对Map的输出进行一定的分区(partition),排序(Sort),合并(Combine),归并(merge)等操作 得到 的中间结果 再交给对应的Reduce 进行处理 。这个过程称之为Shuffle。 从无序的 到有序的 ,这个过程称之为洗牌非常形象。
  • Reduce 拿到了 中间结果作为输入 使用用户自定义的规则进行处理 输出结果交给OutputFormat
  • OutputFormat


Shuffle 是MR 的核心,描述数据从MapTask输出到作为ReduceTask的输入的这段过程。

Hadoop的大数据集群运行,在真正的生产计算中,大部分MapTask 和ReduceTask 都不在同一个节点上,Reduce就要去其他节点上取中间结果(通过网络)。那么集群运行多个Job时,ReduceTask的正常执行或许会严重消耗集群中的网络资源。虽说这种消耗是正常的,是不可避免,但是我们可以采取措施尽量就减少网络传输(可以使用Combiner)。

另外一个点:相比较于,磁盘IO对于Job的运行效率影响很大。

所以 从以上分析 Shuffle过程的基本要求:

  • 完整的从MapTask端拉取数据到ReduceTask端
  • 在拉取数据过程中,尽量减少网络资源的消耗(人为)
  • 尽可能较少磁盘IO对Task执行的影响


总结:Shuffle 是对Map输出结果的分区(partition),排序(Sort),合并(Combine),归并(merge)等操作 交由Reduce的过程

Map 端的Shuffle

(1)map结果写入缓冲区

(2)缓冲区达到阈值 溢写到磁盘

(3)分区内排序合并归并成大文件(key,value-list)


Reduce 端的Shuffle

(1)领取数据

(2)归并数据

(3)数据输入给Reduce任务



编程案例


排序输出

数据

<code>15117826008 83 218 301 zz
15617126008 83 218 302 zz
16519756181 150 164 315 tj
16519756281 150 164 314 tj
18611781131 123 112 237 sh
18611781113 123 112 235 sh
18711671123 45 186 232 bj
18711671133 45 186 233 bj
18711671233 45 186 234 tw
18711631233 45 186 235 xy/<code>


<code>package com.baizhi.test07;

import com.baizhi.test06.OwnPartitioner;
import com.baizhi.test06.PJob;
import com.baizhi.test06.PMapper;
import com.baizhi.test06.PReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class SortJob {

public static void main(String[] args) throws Exception {

//System.setProperty("HADOOP_USER_NAME", "root");

Configuration conf = new Configuration();
//
// conf.addResource("conf2/core-site.xml");
// conf.addResource("conf2/hdfs-site.xml");
// conf.addResource("conf2/mapred-site.xml");
// conf.addResource("conf2/yarn-site.xml");
// conf.set("mapreduce.app-submission.cross-platform", "true");
//
// conf.set(MRJobConfig.JAR, "F:\\\\大数据\\\\代码\\\\BigData\\\\Hadoop_Test\\\\target\\\\Hadoop_Test-1.0-SNAPSHOT.jar");
//

Job job = Job.getInstance(conf);

/*
* 设置类加载器
* */
job.setJarByClass(SortJob.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
/*
* 使用自定义的分区规则
* */
// job.setPartitionerClass(OwnPartitioner.class);


// job.setNumReduceTasks(1);


TextInputFormat.setInputPaths(job, new Path("F:\\\\大数据\\\\笔记\\\\Day02-Hadoop\\\\数据文件\\\\sumFlow.txt"));
//TextInputFormat.setInputPaths(job, new Path("/flow.txt"));
TextOutputFormat.setOutputPath(job, new Path("F:\\\\大数据\\\\笔记\\\\Day02-Hadoop\\\\数据文件\\\\outS111"));
//TextOutputFormat.setOutputPath(job, new Path("/out111111"));


job.setMapperClass(SortMapper.class);
job.setReducerClass(SortReducer.class);


job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(NullWritable.class);


job.setOutputKeyClass(FlowBean.class);
job.setOutputValueClass(NullWritable.class);

job.waitForCompletion(true);


}
}
​/<code>
<code>package com.baizhi.test07;

import com.sun.org.apache.bcel.internal.generic.FLOAD;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class SortMapper extends Mapper<longwritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 拿到数据 使用空格进行分割
String[] flowinfos = value.toString().split(" ");

/*
* 拿到下标为0的电话
* */
String phone = flowinfos[0];
/*
* 拿到下标为1的上传流量
* */
Long upFlow = Long.valueOf(flowinfos[1]);

/*
* 拿到下标为2的下载流量
* */
Long downFlow = Long.valueOf(flowinfos[2]);
Long sumFlow = Long.valueOf(flowinfos[3]);



/*
* 准备bean对象
* */

FlowBean flowBean = new FlowBean();

/*
* */
flowBean.setPhone(phone);
flowBean.setUpFlow(upFlow);
flowBean.setDownFlow(downFlow);
/*
* 设置总流量 上传+下载
* */
flowBean.setSumFlow(sumFlow);

/*
* 数据写出
* */
context.write(flowBean, NullWritable.get());


}
}
​/<longwritable>/<code>
<code>package com.baizhi.test07;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SortReducer extends Reducer<flowbean> {
@Override
protected void reduce(FlowBean key, Iterable<nullwritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
​/<nullwritable>/<flowbean>/<code>


<code>package com.baizhi.test07;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements WritableComparable<flowbean> {

private String phone;
private Long upFlow;
private Long downFlow;
private Long sumFlow;

public FlowBean() {
}

public FlowBean(String phone, Long upFlow, Long downFlow, Long sumFlow) {
this.phone = phone;
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = sumFlow;
}

public String getPhone() {
return phone;
}

public void setPhone(String phone) {
this.phone = phone;
}

public Long getUpFlow() {
return upFlow;
}

public void setUpFlow(Long upFlow) {
this.upFlow = upFlow;
}

public Long getDownFlow() {
return downFlow;
}

public void setDownFlow(Long downFlow) {
this.downFlow = downFlow;
}

public Long getSumFlow() {
return sumFlow;
}

public void setSumFlow(Long sumFlow) {
this.sumFlow = sumFlow;
}

@Override

public String toString() {
return "FlowBean{" +
"phone='" + phone + '\\'' +
", upFlow=" + upFlow +
", downFlow=" + downFlow +
", sumFlow=" + sumFlow +
'}';
}

/*
* 往外写 序列化 编码
* */
public void write(DataOutput dataOutput) throws IOException {

dataOutput.writeUTF(this.phone);
dataOutput.writeLong(this.upFlow);
dataOutput.writeLong(this.downFlow);
dataOutput.writeLong(this.sumFlow);
}


/*
* 往里读 反序列化 解码
* */
public void readFields(DataInput dataInput) throws IOException {

this.phone = dataInput.readUTF();
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.sumFlow = dataInput.readLong();
}

public int compareTo(FlowBean flowBean) {

if (this.sumFlow > flowBean.sumFlow) {

return 1;
} else if (this.sumFlow == flowBean.sumFlow) {

return 0;
} else {
return -1;
}

// return this.sumFlow > flowBean.sumFlow ? -1 : 1;
}
}
​/<flowbean>/<code>


排序分区输出

数据

<code>15117826008 83 218 301 zz
15617126008 83 218 302 zz
16519756181 150 164 315 tj
16519756281 150 164 314 tj
18611781131 123 112 237 sh
18611781113 123 112 235 sh
18711671123 45 186 232 bj
18711671133 45 186 233 bj
18711671233 45 186 234 tw
18711631233 45 186 235 xy/<code>


大数据 Hadoop(下) 笔记大全 收藏加关注


<code>package com.baizhi.test08;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements WritableComparable<flowbean> {

private String phone;
private Long upFlow;
private Long downFlow;
private Long sumFlow;

public FlowBean() {
}

public FlowBean(String phone, Long upFlow, Long downFlow, Long sumFlow) {
this.phone = phone;
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = sumFlow;
}

public String getPhone() {
return phone;
}

public void setPhone(String phone) {
this.phone = phone;
}

public Long getUpFlow() {
return upFlow;
}

public void setUpFlow(Long upFlow) {
this.upFlow = upFlow;
}

public Long getDownFlow() {
return downFlow;
}

public void setDownFlow(Long downFlow) {
this.downFlow = downFlow;
}

public Long getSumFlow() {
return sumFlow;
}

public void setSumFlow(Long sumFlow) {
this.sumFlow = sumFlow;
}

@Override
public String toString() {
return "FlowBean{" +
"phone='" + phone + '\\'' +
", upFlow=" + upFlow +
", downFlow=" + downFlow +
", sumFlow=" + sumFlow +
'}';
}

/*
* 往外写 序列化 编码
* */
public void write(DataOutput dataOutput) throws IOException {

dataOutput.writeUTF(this.phone);
dataOutput.writeLong(this.upFlow);
dataOutput.writeLong(this.downFlow);
dataOutput.writeLong(this.sumFlow);
}


/*
* 往里读 反序列化 解码
* */
public void readFields(DataInput dataInput) throws IOException {

this.phone = dataInput.readUTF();
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.sumFlow = dataInput.readLong();
}

public int compareTo(FlowBean flowBean) {

if (this.sumFlow > flowBean.sumFlow) {

return -1;
} else if (this.sumFlow == flowBean.sumFlow) {

return 0;
} else {
return 1;
}

// return this.sumFlow > flowBean.sumFlow ? -1 : 1;
}
}
​/<flowbean>/<code>
<code>package com.baizhi.test08;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

import java.util.HashMap;

public class OwnPartitioner extends Partitioner<flowbean> {


private static HashMap<string> map = new HashMap<string>();

static {

map.put("zz", 0);
map.put("bj", 1);
map.put("tj", 2);
map.put("sh", 3);

}


public int getPartition(FlowBean key, Text value, int i) {

String areaName = value.toString();

Integer num = map.get(areaName);

/*
* 如果num 没有值 为空则返回4 不为空则返回正常的值
* */
return num == null ? 4 : num;

}
}

​/<string>/<string>/<flowbean>/<code>
<code>package com.baizhi.test08;


import com.baizhi.test07.SortJob;
import com.baizhi.test07.SortMapper;
import com.baizhi.test07.SortReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class SPJob {
public static void main(String[] args) throws Exception {

//System.setProperty("HADOOP_USER_NAME", "root");

Configuration conf = new Configuration();
//
// conf.addResource("conf2/core-site.xml");
// conf.addResource("conf2/hdfs-site.xml");
// conf.addResource("conf2/mapred-site.xml");
// conf.addResource("conf2/yarn-site.xml");
// conf.set("mapreduce.app-submission.cross-platform", "true");
//
// conf.set(MRJobConfig.JAR, "F:\\\\大数据\\\\代码\\\\BigData\\\\Hadoop_Test\\\\target\\\\Hadoop_Test-1.0-SNAPSHOT.jar");
//

Job job = Job.getInstance(conf);

/*
* 设置类加载器
* */
job.setJarByClass(SPJob.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
/*
* 使用自定义的分区规则
* */
job.setPartitionerClass(OwnPartitioner.class);


job.setNumReduceTasks(5);



TextInputFormat.setInputPaths(job, new Path("F:\\\\大数据\\\\笔记\\\\Day02-Hadoop\\\\数据文件\\\\sumFlow.txt"));
//TextInputFormat.setInputPaths(job, new Path("/flow.txt"));
TextOutputFormat.setOutputPath(job, new Path("F:\\\\大数据\\\\笔记\\\\Day02-Hadoop\\\\数据文件\\\\outSP111"));
//TextOutputFormat.setOutputPath(job, new Path("/out111111"));


job.setMapperClass(SPMapper.class);
//job.setReducerClass(SortReducer.class);


job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);

//job.setOutputKeyClass(FlowBean.class);
//job.setOutputValueClass(Text.class);

job.waitForCompletion(true);


}
}
​/<code>
<code>package com.baizhi.test08;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/*
*
* 在valkue中存储地区信息
* */
public class SPMapper extends Mapper<longwritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

// 拿到数据 使用空格进行分割
String[] flowinfos = value.toString().split(" ");

/*
* 拿到下标为0的电话

* */
String phone = flowinfos[0];
/*
* 拿到下标为1的上传流量
* */
Long upFlow = Long.valueOf(flowinfos[1]);

/*
* 拿到下标为2的下载流量
* */
Long downFlow = Long.valueOf(flowinfos[2]);
Long sumFlow = Long.valueOf(flowinfos[3]);
/*
* 拿到地区信息
* */
String areaName = flowinfos[4];




/*
* 准备bean对象
* */
FlowBean flowBean = new FlowBean();

/*
* */
flowBean.setPhone(phone);
flowBean.setUpFlow(upFlow);
flowBean.setDownFlow(downFlow);
/*
* 设置总流量 上传+下载
* */
flowBean.setSumFlow(sumFlow);

/*
* 数据写出
* */
context.write(flowBean, new Text(areaName));


}
}
​/<longwritable>/<code>


成绩合并

学生信息

<code>gjf 00001
gzy 00002
jzz 00003
zkf 00004/<code>

学生课程信息

<code>00001 yuwen
00001 shuxue
00002 yinyue
00002 yuwen
00003 tiyu
00003 shengwu
00004 tiyu
00004 wuli/<code>

期望输出结果:

<code>00001 gjf yuwem shuxue/<code>
<code>package com.baizhi.test09;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class MJob {
public static void main(String[] args) throws Exception {

//System.setProperty("HADOOP_USER_NAME", "root");

Configuration conf = new Configuration();
//
// conf.addResource("conf2/core-site.xml");
// conf.addResource("conf2/hdfs-site.xml");
// conf.addResource("conf2/mapred-site.xml");
// conf.addResource("conf2/yarn-site.xml");
// conf.set("mapreduce.app-submission.cross-platform", "true");

//
// conf.set(MRJobConfig.JAR, "F:\\\\大数据\\\\代码\\\\BigData\\\\Hadoop_Test\\\\target\\\\Hadoop_Test-1.0-SNAPSHOT.jar");
//

Job job = Job.getInstance(conf);

/*
* 设置类加载器
* */
job.setJarByClass(MJob.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
/*
* 使用自定义的分区规则
* */
// job.setPartitionerClass(OwnPartitioner.class);


// job.setNumReduceTasks(5);


TextInputFormat.setInputPaths(job, new Path("F:\\\\大数据\\\\代码\\\\BigData\\\\Hadoop_Test\\\\src\\\\main\\\\java\\\\com\\\\baizhi\\\\test09\\\\in"));
//TextInputFormat.setInputPaths(job, new Path("/flow.txt"));
TextOutputFormat.setOutputPath(job, new Path("F:\\\\大数据\\\\代码\\\\BigData\\\\Hadoop_Test\\\\src\\\\main\\\\java\\\\com\\\\baizhi\\\\test09\\\\out11"));
//TextOutputFormat.setOutputPath(job, new Path("/out111111"));


job.setMapperClass(MMapper.class);
job.setReducerClass(MReducer.class);


job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.waitForCompletion(true);


}
}
​/<code>


大数据 Hadoop(下) 笔记大全 收藏加关注


<code>package com.baizhi.test09;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class MMapper extends Mapper<longwritable> {

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
/*
* 通过context 获得当前读取的文件

* */
FileSplit fileSplit = (FileSplit) context.getInputSplit();
/*
* 获取文件路径
* */
Path path = fileSplit.getPath();


/*
* 当前行使用空格进行分割
* */
String[] infos = value.toString().split(" ");

/*准备存储key的变量
* */
String mapkey = "";

/*
* 准备存储value的变量
* */
String mapValue = "";

/*
* 判断当前行来自哪个文件
* */

if (path.toString().contains("student_info.txt")) {

/*
* 名字
* 加上a 代表是名字
* */
mapValue = infos[0] + " a";

/*
* 学号
* */
mapkey = infos[1];

} else {
/*
*学科
* 加上b 代表是学科
* */

mapValue = infos[1] + " b";


/*
* 学号
* */
mapkey = infos[0];


/*(00001,gjf)
(00001,yuwen)
(00001,shuxue)
*/

}


/*
* 输出学号 key | 输出可能是name或者学科 value
* */
context.write(new Text(mapkey), new Text(mapValue));

}
}
​/<longwritable>/<code>


<code>package com.baizhi.test09;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;

public class MReducer extends Reducer<text> {

/* (00001,gjf a)
(00001,yuwen b)
(00001,shuxue b)
*/

@Override
protected void reduce(Text key, Iterable<text> values, Context context) throws IOException, InterruptedException {
/*
* 准备存放名字的变量

* */
String name = "";

/*
* 准备存放 学科的集合
* */
ArrayList<string> classList = new ArrayList<string>();


/*
* 遍历当前学号的下的所有信息
* */
for (Text value : values) {

/*
* 使用 空格进行分割
* */
String[] infos = value.toString().split(" ");
/*
* 拿到flag 标识 表示是学科还是姓名
* */
String flag = infos[1];


/*
* 如果为a 则为姓名
* */
if (flag.contains("a")) {

/*
* 拿到name的值 复制给name
* */
name = infos[0];

/*
* 其他则为 学科
* */
} else {
/*
* 拿到学科的值 封装在list中
* */
classList.add(infos[0]);


}


}


/*
* 将集合中的值 取出 放在 字符串中
* */
String clasLine = "";
for (String s : classList) {
clasLine += s + " ";
}

/*
* 最终进行输出
* */
context.write(new Text(key.toString().trim()), new Text(name + " " + clasLine.trim()));
}
}
​/<string>/<string>/<text>/<text>/<code>


MR优化策略

(1)干预切片计算逻辑-CombineTextInputFormat(小文件优化)

<code>CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4M/<code>

(2)实现Partitioner策略防止数据倾斜,实现Reduce Task负载均衡

(3)适当调整YranChild内存参数,需要查阅Yarn参数配置手册,一般调整vcores和内存参数CPU使用

(4)适当调整溢写缓冲区大小和阈值

(5)适当调整合并文件并行度mapreduce.task.io.sort.factor=10'

(6)对Map端输出溢写文件使用gzip压缩,节省网络带宽

<code>conf.setBoolean("mapreduce.map.output.compress", true);
conf.setClass("mapreduce.map.output.compress.codec",
GzipCodec.class, CompressionCodec.class);/<code>


大数据 Hadoop(下) 笔记大全 收藏加关注



分享到:


相關文章: