MapReduce詳解

MapReduce是什麼

MapReduce是一種分佈式計算編程框架,是Hadoop主要組成部分之一,可以讓用戶專注於編寫核心邏輯代碼,最後以高可靠、高容錯的方式在大型集群上並行處理大量數據。

MapReduce的存儲

MapReduce的數據是存儲在HDFS上的,HDFS也是Hadoop的主要組成部分之一。下邊是MapReduce在HDFS上的存儲的圖解

MapReduce詳解

HDFS主要有Namenode和Datanode兩部分組成,整個集群有一個Namenode和多個DataNode,通常每一個節點一個DataNode,Namenode的主要功能是用來管理客戶端client對數據文件的操作請求和儲存數據文件的地址。DataNode主要是用來儲存和管理本節點的數據文件。節點內部數據文件被分為一個或多個block塊(block默認大小原來是64MB,後來變為128MB),然後這些塊儲存在一組DataNode中。(這裡不對HDFS做過多的介紹,後續會寫一篇詳細的HDFS筆記)

MapReduce的運行流程

MapReduce詳解

MapReduce詳解

1、首先把需要處理的數據文件上傳到HDFS上,然後這些數據會被分為好多個小的分片,然後每個分片對應一個map任務,推薦情況下分片的大小等於block塊的大小。然後map的計算結果會暫存到一個內存緩衝區內,該緩衝區默認為100M,等緩存的數據達到一個閾值的時候,默認情況下是80%,然後會在磁盤創建一個文件,開始向文件裡邊寫入數據。

2、map任務的輸入數據的格式是對的形式,我們也可以自定義自己的

類型。然後map在往內存緩衝區裡寫入數據的時候會根據key進行排序,同樣溢寫到磁盤的文件裡的數據也是排好序的,最後map任務結束的時候可能會產生多個數據文件,然後把這些數據文件再根據歸併排序合併成一個大的文件。

3、然後每個分片都會經過map任務後產生一個排好序的文件,同樣文件的格式也是對的形式,然後通過對key進行hash的方式把數據分配到不同的reduce裡邊去,這樣對每個分片的數據進行hash,再把每個分片分配過來的數據進行合併,合併過程中也是不斷進行排序的。最後數據經過reduce任務的處理就產生了最後的輸出。

4、在我們開發中只需要對中間map和reduce的邏輯進行開發就可以了,中間分片,排序,合併,分配都有MapReduce框架幫我完成了。

MapReduce的資源調度系統

最後我們來看一下MapReduce的資源調度系統Yarn。

MapReduce詳解

Yarn的基本思想是將資源管理和作業調度/監視的功能分解為單獨的守護進程。全局唯一的ResourceManager是負責所有應用程序之間的資源的調度和分配,每個程序有一個ApplicationMaster,ApplicationMaster實際上是一個特定於框架的庫,其任務是協調來自ResourceManager的資源,並與NodeManager一起執行和監視任務。NodeManager是每臺機器框架代理,監視其資源使用情況(CPU,內存,磁盤,網絡)並將其報告給ResourceManager。

WordConut代碼

  • python實現

map.py


#!/usr/bin/env python
# -*- coding:UTF-8 -*-
import sys

for line in sys.stdin:
words = line.strip().split()
for word in words:
print('%s\\t%s' % (word, 1))

reduce.py


#!/usr/bin/env python
# -*- coding:UTF-8 -*-
import sys

current_word = None
sum = 0

for line in sys.stdin:

word, count = line.strip().split(' ')

if current_word == None:
current_word = word

if word != current_word:
print('%s\\t%s' % (current_word, sum))
current_word = word
sum = 0

sum += int(count)

print('%s\\t%s' % (current_word, sum))

我們先把輸入文件上傳到HDFS上去


hadoop fs -put /input.txt /

​然後在Linux下運行,為了方便我們把命令寫成了shell文件


HADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"

INPUT_FILE_PATH="/input.txt"
OUTPUT_FILE_PATH="/output"

$HADOOP_CMD fs -rmr -skipTrush $OUTPUT_FILE_PATH

$HADOOP_CMD jar $STREAM_JAR_PATH \\
-input $INPUT_FILE_PATH \\
-output $OUTPUT_FILE_PATH \\
-mapper "python map.py" \\
-reducer "python reduce.py" \\
-file "./map.py" \\
-file "./reduce.py"

  • java實現

MyMap.java


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MyMap extends Mapper<longwritable> {

private IntWritable one = new IntWritable(1);
private Text text = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");

for (String word: words){
text.set(word);
context.write(text,one);
}
}
}

/<longwritable>

MyReduce.java


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MyReduce extends Reducer<text> {
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<intwritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable i:values){
sum+=i.get();
}
result.set(sum);
context.write(key,result);
}
}

/<intwritable>/<text>

WordCount.java


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class WordCount {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, "WordCount");
job.setJarByClass(WordCount.class);
job.setMapperClass(MyMap.class);
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

把工程打成jar包,然後把jar包和輸入文件上傳到HDfs


$ hadoop fs -put /wordcount.jar /
$ hadoop fs -put /input.txt /

執行wordcount任務


$ bin/hadoop jar wordcount.jar WordCount /input.txt /user/joe/wordcount/output


分享到:


相關文章: