五大核心組件:
下述五大組件都是可編程,可以根據用戶需求自行配置
- InputFormat 組件
- Mapper組件
- Partitioner組件
- Redcer組件
- OutputFormat組件
不是必備的組件:
- Combiner
用於優化程序性能,可以在不影響最終業務結果的前提下使用
InputFormat組件
輸入組件:InputForamt組件有兩個功能
- 數據的切分:按照某種規則將輸入數據切分成若干個Split,確定MapTask個數以及對應的Split
- 為Mapper提供數據:給定某個Split,將其解析成一個一個的key、value對
(1)什麼是切片?
(2)如何切片?(重點)
使用時本地進行測試 所以blockSize 和SplitSize 通通都是32MB
package org.apache.hadoop.mapreduce.lib.input; 包
FileInputFormat 類getSplits 方法221-276行
<code>protected boolean isSplitable(JobContext context, Path filename) { // 文件默認可以切分 return true;}// 計算切片大小的方法 protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); // 本地 32MB 32MB }public List
如下錯誤是因為在虛擬機的環境下沒有Mysql環境(沒有jar)
<code>將 mysql jar 拷貝到/home/hadoop/hadoop-2.6.0/share/hadoop/yarn/ 文件夾即可 /<code>跨平臺提交
基本和之前保持一致
<code>System.setProperty("HADOOP_USER_NAME", "root");configuration.addResource("conf2/core-site.xml");configuration.addResource("conf2/hdfs-site.xml");configuration.addResource("conf2/mapred-site.xml");configuration.addResource("conf2/yarn-site.xml");configuration.set("mapreduce.app-submission.cross-platform", "true");configuration.set(MRJobConfig.JAR, "F:\\\\大數據\\\\代碼\\\\BigData\\\\Hadoop_Test\\\\target\\\\Hadoop_Test-1.0-SNAPSHOT.jar");/<code>自定義InputFormat
解決小文件存儲
涉及到的知識點:自定義InputFormat, SequenceFileOutputFormat
<code>思路 : 首先通過自定義的InputFormat 代碼將數據讀取過來(一次進行處理,將小文件的內容一次性輸出到大文件中),使用SequenceFileOutputFormat輸出文件內容SequenceFile文件是Haadoop用來存儲二進制形式的key-value文件格式key - 文件的路徑+名稱value - 二進制文件的內容 /<code><code>package com.baizhi.test05;import com.baizhi.test03.BeanJob;import com.baizhi.test03.BeanMapper;import com.baizhi.test03.BeanReducer;import com.baizhi.test03.FlowBean;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.ByteWritable;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.MRJobConfig;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class FileJob { public static void main(String[] args) throws Exception { // System.setProperty("HADOOP_USER_NAME", "root"); Configuration conf = new Configuration();/* conf.addResource("conf2/core-site.xml"); conf.addResource("conf2/hdfs-site.xml"); conf.addResource("conf2/mapred-site.xml"); conf.addResource("conf2/yarn-site.xml"); conf.set("mapreduce.app-submission.cross-platform", "true"); conf.set(MRJobConfig.JAR, "F:\\\\大數據\\\\代碼\\\\BigData\\\\Hadoop_Test\\\\target\\\\Hadoop_Test-1.0-SNAPSHOT.jar");*/ Job job = Job.getInstance(conf); /* * 設置類加載器 * */ job.setJarByClass(FileJob.class); job.setInputFormatClass(OwnInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); //TextInputFormat.setInputPaths(job, new Path("F:\\\\大數據\\\\筆記\\\\Day02-Hadoop\\\\作業\\\\數據\\\\log.txt")); OwnInputFormat.setInputPaths(job, new Path("F:\\\\大數據\\\\代碼\\\\BigData\\\\Hadoop_Test\\\\src\\\\main\\\\java\\\\com\\\\baizhi\\\\test05\\\\in")); //TextOutputFormat.setOutputPath(job, new Path("F:\\\\大數據\\\\筆記\\\\Day02-Hadoop\\\\作業\\\\數據\\\\out1")); SequenceFileOutputFormat.setOutputPath(job, new Path("F:\\\\大數據\\\\代碼\\\\BigData\\\\Hadoop_Test\\\\src\\\\main\\\\java\\\\com\\\\baizhi\\\\test05\\\\out01")); job.setMapperClass(FileMapper.class); job.setReducerClass(FileReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BytesWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); job.waitForCompletion(true); }}/<code>
<code>package com.baizhi.test05;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FileMapper extends Mapper<text> {
@Override
protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {
context.write(key,value);
}
}
/<text>/<code><code>package com.baizhi.test05;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class FileReducer extends Reducer<text> { @Override protected void reduce(Text key, Iterable<byteswritable> values, Context context) throws IOException, InterruptedException { for (BytesWritable value : values) { context.write(key, value); } }}/<byteswritable>/<text>/<code><code>package com.baizhi.test05;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.ByteWritable;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import java.io.IOException;public class OwnInputFormat extends FileInputFormat<text> { /* * 是否可以切分? * */ @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } public RecordReader<text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { OwnRecordReader recordReader = new OwnRecordReader(); recordReader.initialize(inputSplit, taskAttemptContext); return recordReader; }}/<text>/<text>/<code><code>package com.baizhi.test05;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class OwnRecordReader extends RecordReader<text> { // 定義key Text key = new Text(); // 定義value BytesWritable value = new BytesWritable(); FileSplit fileSplit; Configuration configuration; boolean isProgress = true; public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { // 初始化 fileSplit = (FileSplit) inputSplit; /* *通過TaskAttemptContext拿到配置對象 * */ configuration = taskAttemptContext.getConfiguration(); } /* * 在nextKeyValue () 方法中封裝key和value的值 * */ public boolean nextKeyValue() throws IOException, InterruptedException { if (isProgress) { /* * 獲取path對象 * */ Path path = fileSplit.getPath(); /* * 獲得文件系統對象 * */ FileSystem fileSystem = path.getFileSystem(configuration); /* * 這是key Text key * */ String name = path.toString(); /* * 封裝key * */ key.set(new Text(name)); /* * 當前遍歷到的文件流 這就是value bytes 為value * */ FSDataInputStream fsDataInputStream = fileSystem.open(path); /* * 存放value數據的數組 * */ byte[] bytes = new byte[(int) fileSplit.getLength()]; /* * IOUtils直接拷貝 * */ IOUtils.readFully(fsDataInputStream, bytes, 0, bytes.length); /* * 封裝value中 * */ value.set(bytes, 0, bytes.length); /* * 關閉資源 * */ IOUtils.closeStream(fsDataInputStream); isProgress = false; return true; } return false; } public Text getCurrentKey() throws IOException, InterruptedException { return this.key; } public BytesWritable getCurrentValue() throws IOException, InterruptedException { return this.value; } public float getProgress() throws IOException, InterruptedException { return 0; } public void close() throws IOException { }}/<text>/<code>小問題?
(1) 能不能通過其他方式解決此問題?
(2) 嘗試讀取 SequenceFile 文件中的內容
CombineTextInputFormat
其主要目的就是為了優化小文件的計算
在使用TextInputFormat 情況下 有所少個文件就有多少個切片
<code>job.setInputFormatClass(CombineTextInputFormat.class); CombineTextInputFormat.setInputPaths(job,new Path("F:\\\\大數據\\\\代碼\\\\BigData\\\\Hadoop_Test\\\\src\\\\main\\\\java\\\\com\\\\baizhi\\\\test05\\\\in"));CombineTextInputFormat.setMinInputSplitSize(job,10240000);/<code>切片與MapTask的關係
MapTask的併發數據量由切片數量決定 ReduceTask數量的決定是可以手動進行設置 默認值為1
Partitioner 組件
為什麼相同的key會被分配到一起?
package org.apache.hadoop.mapreduce.lib.partition; 包 HashPartitioner 類 getPartition 方法
<code>package com.baizhi.test06;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;import java.util.HashMap;public class OwnPartitioner extends Partitioner<text> { // 設置存儲地區和分區數據的Map private static HashMap<string> map = new HashMap<string>(); static { // 存儲數據 map.put("zz", 0); map.put("bj", 1); map.put("tj", 2); map.put("sh", 3); } public int getPartition(Text key, Text value, int i) { // 獲得到地區名稱 String areaName = key.toString(); // 獲得分區數 Integer num = map.get(areaName); /* * 如果num 沒有值 為空則返回4 不為空則返回正常的值 * */ return num == null ? 4 : num; }}/<string>/<string>/<text>/<code>需求:
<code>package com.baizhi.test06;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;import java.util.HashMap;public class OwnPartitioner extends Partitioner<text> { // 設置存儲地區和分區數據的Map private static HashMap<string> map = new HashMap<string>(); static { // 存儲數據 map.put("zz", 0); map.put("bj", 1); map.put("tj", 2); map.put("sh", 3); } public int getPartition(Text key, Text value, int i) { // 獲得到地區名稱 String areaName = key.toString(); // 獲得分區數 Integer num = map.get(areaName); /* * 如果num 沒有值 為空則返回4 不為空則返回正常的值 * */ return num == null ? 4 : num; }}/<string>/<string>/<text>/<code>總結:
NumReduceTasks的個數 必須大於等於分區的數量(自定義)
在默認情況NumReduceTasks的個數為1 ,經過測試 ,不管如何手動分區,數據都將在一個分區中(也就是說在NumReduceTasks為1 的情況下,手動分區策略是失效)
另外,我們還觀察到 有多少個NumReduceTasks 就有多少個文件的輸出
<code>package com.baizhi.test06;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;import java.util.HashMap;public class OwnPartitioner extends Partitioner<text> { // 設置存儲地區和分區數據的Map private static HashMap<string> map = new HashMap<string>(); static { // 存儲數據 map.put("zz", 0); map.put("bj", 1); map.put("tj", 2); map.put("sh", 3); } public int getPartition(Text key, Text value, int i) { // 獲得到地區名稱 String areaName = key.toString(); // 獲得分區數 Integer num = map.get(areaName); /* * 如果num 沒有值 為空則返回4 不為空則返回正常的值 * */ return num == null ? 4 : num; }}/<string>/<string>/<text>/<code><code>package com.baizhi.test06;import com.baizhi.test03.BeanJob;import com.baizhi.test03.BeanMapper;import com.baizhi.test03.BeanReducer;import com.baizhi.test03.FlowBean;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.MRJobConfig;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class PJob { public static void main(String[] args) throws Exception { //System.setProperty("HADOOP_USER_NAME", "root"); Configuration conf = new Configuration();//// conf.addResource("conf2/core-site.xml");// conf.addResource("conf2/hdfs-site.xml");// conf.addResource("conf2/mapred-site.xml");// conf.addResource("conf2/yarn-site.xml");// conf.set("mapreduce.app-submission.cross-platform", "true");//// conf.set(MRJobConfig.JAR, "F:\\\\大數據\\\\代碼\\\\BigData\\\\Hadoop_Test\\\\target\\\\Hadoop_Test-1.0-SNAPSHOT.jar");// Job job = Job.getInstance(conf); /* * 設置類加載器 * */ job.setJarByClass(PJob.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); /* * 使用自定義的分區規則 * */ job.setPartitionerClass(OwnPartitioner.class); // 設置NumReduceTasks 的個數 job.setNumReduceTasks(5); TextInputFormat.setInputPaths(job, new Path("F:\\\\大數據\\\\筆記\\\\Day02-Hadoop\\\\數據文件\\\\sumFlow.txt")); //TextInputFormat.setInputPaths(job, new Path("/flow.txt")); TextOutputFormat.setOutputPath(job, new Path("F:\\\\大數據\\\\筆記\\\\Day02-Hadoop\\\\數據文件\\\\outP1111111")); //TextOutputFormat.setOutputPath(job, new Path("/out111111")); job.setMapperClass(PMapper.class); job.setReducerClass(PReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); }}/<code><code>package com.baizhi.test06;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/* * * reduceTask的個數和輸出文件的關係 * reduceTask 和 分區 之間的關係 * * */public class PMapper extends Mapper<longwritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { /* * key 為地區 value 為流量信息 * */ String areaName = value.toString().split(" ")[4]; context.write(new Text(areaName), value); }}/<longwritable>/<code><code>package com.baizhi.test06;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class PReducer extends Reducer<text> { @Override protected void reduce(Text key, Iterable<text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { context.write(NullWritable.get(), value); } }}/<text>/<text>/<code>注意 :在上述代碼中 我們沒有關注 業務 只是將不同地區的文件 通過自定義分區代碼 輸出到不同的文件中 ,還設置了ReduceTask的個數
分享到:
閱讀更多 JackYang1993 的文章