第一章:MapReduce概述

Mapredce定義:

MapReduce是一個分佈式運算程序的編程框架,是用戶開發(基於Hadoop的數據分析應用)的核心框架。

MapReduce的核心功能是將用戶編寫的業務邏輯代碼與自帶的默認組件整合成一個完整的分佈式運算程序,併發運行在一個Hadoop集群上。

MapReduce優缺點

1、優點

1.1、易於編程

1.2、良好的擴張性

1.3、高容錯

  • 其中一臺機器掛掉,它上面運行的計算任務會自動轉移到另外的機器上運行,不需要人工干預

1.4、適用PB級以上海量數據的離線處理

2、缺點

2.1、不擅長實時計算

2.2、不擅長流式計算

2.3、不擅長DAG(有向圖)計算

  • 多個應用存在依賴關係,後一個應用的輸入是前一個應用的輸出,在這種情況下,MapReduce不是不能做,而是每個MapReduce的輸出結果都會 寫入到磁盤,會造成大量的磁盤IO,性能低下

MapReduce 進程

  • MrAppMaster:負責整個程序的過程調度和狀態協調
  • MapTask:負責Map階段的數據處理流程
  • ReduceTask:負責Reduce階段數據處理流程

常用的數據序列化類型

Java類型 Hadoop Writable類型

boolean BooleanWritable

byte ByteWritable

int IntWritable

float FloatWritable

long LongWritable

double DoubleWritable

String Text

map MapWritable

array ArrayWritable

MapReduce 編程規範

  • Mapper
    1. 用戶自定義的Mapper 繼承父類,重寫父類的map() 方法
    2. Mapper 中業務邏輯寫在map() 方法內
    3. Mapper 的輸入類型和輸出類型都是 K V對的形式,
  • Reduce
    1. 用戶自定義的Reduce 繼承父類,重寫父類的reduce() 方法
    2. Reduce 業務邏輯寫在reduce() 方法內
    3. Reduce的輸入類型,輸出類型都是K V對的形式
    4. Reduce對每組k 調用一次reduce() 方法
  • Driver
    1. 相當於yarn集群的客戶端,用於提交整個程序到yarn 集群,提交時封裝了MapReduce程序相關運行參數的job對象

wordCount演練

  • 需求:統計文件中每個單詞出現的次數,文件存儲在HDFS中,路徑和內容如下:
<code>[root@bbx hadoop-3.1.3]# bin/hdfs dfs -ls -R /home/input/
-rw-r--r--   1 root supergroup         30 2020-05-02 17:05 /home/input/name
[root@bbx hadoop-3.1.3]# bin/hdfs dfs -cat /home/input/name
2020-05-04 16:47:12,295 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
aa bb ss
aa cc dd
bb
cc
dd
ee/<code>
  • maven依賴
<code>

    4.0.0
    
        org.springframework.boot
        spring-boot-starter-parent
        2.2.6.RELEASE
          
    
    com.bbx
    wcdemo
    0.0.1-SNAPSHOT
    wcdemo
    Demo project for Spring Boot

    
        1.8
    

    
        
            org.springframework.boot
            spring-boot-starter
            
                
                    org.springframework.boot
                    spring-boot-starter-logging
                
            
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
            
                
                    org.junit.vintage
                    junit-vintage-engine
                
            
        

        
            org.apache.hadoop
            hadoop-common
            3.1.3
            
                
                    slf4j-log4j12
                    org.slf4j
                
                
                    com.google.code.gson
                    gson
                
            
        

        
            org.apache.hadoop
            hadoop-client
            3.1.3
        

         
        
            org.apache.hadoop
            hadoop-hdfs
            3.1.3
            
                
                    slf4j-log4j12
                    org.slf4j
                
            
        
    

    
        
             
            
                maven-compiler-plugin
                
                    1.8
                    1.8
                
            
            
                maven-assembly-plugin
                
                    
                        jar-with-dependencies
                    
                    
                        
                            com.bbx.wcdemo.WcdemoApplication
                        
                    
                
                
                    
                        make-assembly
                        package
                        
                            single
                        
                    
                
            
        
    


/<code>
  • 自定義mapper
<code>package com.bbx.wcdemo;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WCMapper extends Mapper {
    Text text = new Text();
    IntWritable intWritable = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //按行將內容讀取進來
        String[] values = value.toString().split(" ");
        for(String v:values){
            text.set(v);
            //拆分後按照 key,value 寫出   如 bbx:1,多個不會合並
            context.write(text,intWritable);
        }
    }
}
/<code>
  • 自定義reduce
<code>package com.bbx.wcdemo;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WCReduce   extends Reducer {
    IntWritable intWritable =new IntWritable();
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        //按照字典順序key分組,加載每一組key,key組內的value值求和
        for(IntWritable value:values){
            sum += value.get();
        }
        intWritable.set(sum);
        context.write(key,intWritable);
    }
}
/<code>
  • Driver 驅動
<code>package com.bbx.wcdemo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;

public class WcdemoApplication {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(WcdemoApplication.class);

        job.setMapperClass(WCMapper.class);
        job.setReducerClass(WCReduce.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        job.waitForCompletion(true);
    }

}
/<code>
  • 集群環境運行----(輸出路徑不能存在)
<code>hadoop jar wcdemo-0.0.1-SNAPSHOT.jar com.bbx.wcdemo.WcdemoApplication  /home/input/ /home/output/<code>


分享到:


相關文章: