通過例子講解Spring Batch入門,優秀的批處理框架

1 前言

Spring Batch是一個輕量級的、完善的批處理框架,作為Spring體系中的一員,它擁有靈活、方便、生產可用的特點。在應對高效處理大量信息、定時處理大量數據等場景十分簡便。

結合調度框架能更大地發揮Spring Batch的作用。

2 Spring Batch的概念知識

2.1 分層架構

Spring Batch的分層架構圖如下:

通過例子講解Spring Batch入門,優秀的批處理框架

可以看到它分為三層,分別是:

  • Application應用層:包含了所有任務batch jobs和開發人員自定義的代碼,主要是根據項目需要開發的業務流程等。
  • Batch Core核心層:包含啟動和管理任務的運行環境類,如JobLauncher等。
  • Batch Infrastructure基礎層:上面兩層是建立在基礎層之上的,包含基礎的讀入reader和寫出writer、重試框架等。

2.2 關鍵概念

理解下圖所涉及的概念至關重要,不然很難進行後續開發和問題分析。

通過例子講解Spring Batch入門,優秀的批處理框架

2.2.1 JobRepository

專門負責與數據庫打交道,對整個批處理的新增、更新、執行進行記錄。所以Spring Batch是需要依賴數據庫來管理的。

2.2.2 任務啟動器JobLauncher

負責啟動任務Job。

2.2.3 任務Job

Job是封裝整個批處理過程的單位,跑一個批處理任務,就是跑一個Job所定義的內容。

通過例子講解Spring Batch入門,優秀的批處理框架

上圖介紹了Job的一些相關概念:

  • Job:封裝處理實體,定義過程邏輯。
  • JobInstance:Job的運行實例,不同的實例,參數不同,所以定義好一個Job後可以通過不同參數運行多次。
  • JobParameters:與JobInstance相關聯的參數。
  • JobExecution:代表Job的一次實際執行,可能成功、可能失敗。

所以,開發人員要做的事情,就是定義Job。

2.2.4 步驟Step

Step是對Job某個過程的封裝,一個Job可以包含一個或多個Step,一步步的Step按特定邏輯執行,才代表Job執行完成。

通過例子講解Spring Batch入門,優秀的批處理框架

通過定義Step來組裝Job可以更靈活地實現複雜的業務邏輯。

2.2.5 輸入——處理——輸出

所以,定義一個Job關鍵是定義好一個或多個Step,然後把它們組裝好即可。而定義Step有多種方法,但有一種常用的模型就是輸入——處理——輸出,即Item Reader、Item Processor和Item Writer。比如通過Item Reader從文件輸入數據,然後通過Item Processor進行業務處理和數據轉換,最後通過Item Writer寫到數據庫中去。

Spring Batch為我們提供了許多開箱即用的Reader和Writer,非常方便。

3 代碼實例

理解了基本概念後,就直接通過代碼來感受一下吧。整個項目的功能是從多個csv文件中讀數據,處理後輸出到一個csv文件。

3.1 基本框架

添加依賴:

<code>

<

dependency

>

  

<

groupId

>

org.springframework.boot

groupId

>

  

<

artifactId

>

spring-boot-starter-batch

artifactId

>

dependency

>

<

dependency

>

  

<

groupId

>

com.h2database

groupId

>

  

<

artifactId

>

h2

artifactId

>

  

<

scope

>

runtime

scope

>

dependency

>

/<code>

需要添加Spring Batch的依賴,同時使用H2作為內存數據庫比較方便,實際生產肯定是要使用外部的數據庫,如Oracle、PostgreSQL。

入口主類:

<code>

@SpringBootApplication

@EnableBatchProcessing

public class PkslowBatchJobMain {     

public

 

static

 

void

 

main

(String[] args) {         

SpringApplication

.run

(PkslowBatchJobMain.class, args);     } } /<code>

也很簡單,只是在Springboot的基礎上添加註解@EnableBatchProcessing。

領域實體類Employee:

<code>

package

 com.pkslow.batch.entity;

public

 class Employee {     

String

 id;     

String

 firstName;     

String

 

last

Name; } /<code>

對應的csv文件內容如下:

<code>

id,firstName,lastName

1

,Lokesh,Gupta

2

,Amit,Mishra

3

,Pankaj,Kumar

4

,David,Miller

/<code>

3.2 輸入——處理——輸出

3.2.1 讀取ItemReader

因為有多個輸入文件,所以定義如下:

<code> (

"input/inputData*.csv"

)

private

 Resource[] inputResources;

public

 MultiResourceItemReader 

multiResourceItemReader

()

{   MultiResourceItemReader resourceItemReader = 

new

 MultiResourceItemReader();   resourceItemReader.setResources(inputResources);   resourceItemReader.setDelegate(reader());   

return

 resourceItemReader; }

public

 FlatFileItemReader 

reader

()

{   FlatFileItemReader reader = 

new

 FlatFileItemReader();      reader.setLinesToSkip(

1

);   reader.setLineMapper(

new

 DefaultLineMapper() {     {       setLineTokenizer(

new

 DelimitedLineTokenizer() {         {                      setNames(

new

 String[] { 

"id"

"firstName"

"lastName"

 });         }       });       setFieldSetMapper(

new

 BeanWrapperFieldSetMapper() {         {                      setTargetType(Employee

.

class

)

;         }       });     }   });   

return

 reader; } /<code>

這裡使用了FlatFileItemReader,方便我們從文件讀取數據。

3.2.2 處理ItemProcessor

為了簡單演示,處理很簡單,就是把最後一列轉為大寫:

<code>

public

 ItemProcessor itemProcessor() {   

return

 employee -> {     employee.setLastName(employee.getLastName().toUpperCase());     

return

 employee;   }; } /<code>

3.2.3 輸出ItremWriter

比較簡單,代碼及註釋如下:

<code>

private

 Resource outputResource = 

new

 FileSystemResource(

"output/outputData.csv"

);

public

 FlatFileItemWriter 

writer

()

{   FlatFileItemWriter writer = 

new

 FlatFileItemWriter<>();   writer.setResource(outputResource);      writer.setAppendAllowed(

true

);   writer.setLineAggregator(

new

 DelimitedLineAggregator() {     {              setDelimiter(

","

);       setFieldExtractor(

new

 BeanWrapperFieldExtractor() {         {                      setNames(

new

 String[] { 

"id"

"firstName"

"lastName"

 });         }       });     }   });   

return

 writer; } /<code>

3.3 Step

有了Reader-Processor-Writer後,就可以定義Step了:

<code>

@Bean

public Step csvStep() {   

return

 

stepBuilderFactory

.get

("

csvStep

").<

Employee

Employee

>

chunk

(5)     

.reader

(

multiResourceItemReader

())     

.processor

(

itemProcessor

())     

.writer

(

writer

())     

.build

(); } /<code>

這裡有一個chunk的設置,值為5,意思是5條記錄後再提交輸出,可以根據自己需求定義。

3.4 Job

完成了Step的編碼,定義Job就容易了:

<code>@

Bean

public

 Job 

pkslowCsvJob

(

)

 {   

return

 jobBuilderFactory     .

get

(

"pkslowCsvJob"

)     .incrementer(

new

 RunIdIncrementer())     .start(csvStep())     .build(); } /<code>

3.5 運行

完成以上編碼後,執行程序,結果如下:

通過例子講解Spring Batch入門,優秀的批處理框架

成功讀取數據,並將最後字段轉為大寫,並輸出到outputData.csv文件。

4 監聽Listener

可以通過Listener接口對特定事件進行監聽,以實現更多業務功能。比如如果處理失敗,就記錄一條失敗日誌;處理完成,就通知下游拿數據等。

我們分別對Read、Process和Write事件進行監聽,對應分別要實現ItemReadListener接口、ItemProcessListener接口和ItemWriteListener接口。因為代碼比較簡單,就是打印一下日誌,這裡只貼出ItemWriteListener的實現代碼:

<code>

public

 

class

 

PkslowWriteListener

 

implements

 

ItemWriteListener

 {

    

private

 

static

 

final

 Log logger = LogFactory.getLog(PkslowWriteListener.class);     @

Override     

public

 

void

 

beforeWrite

(List extends Employee> 

list

)

 

{         logger.info(

"beforeWrite: "

 + 

list

);     }     @

Override     

public

 

void

 

afterWrite

(List extends Employee> 

list

)

 

{         logger.info(

"afterWrite: "

 + 

list

);     }     @

Override     

public

 

void

 

onWriteError

(Exception e, List extends Employee> 

list

)

 

{         logger.info(

"onWriteError: "

 + 

list

);     } } /<code>

把實現的監聽器listener整合到Step中去:

<code>

@Bean

public Step csvStep() {   

return

 

stepBuilderFactory

.get

("

csvStep

").<

Employee

Employee

>

chunk

(5)     

.reader

(

multiResourceItemReader

())     

.listener

(

new

 

PkslowReadListener

())     

.processor

(

itemProcessor

())     

.listener

(

new

 

PkslowProcessListener

())     

.writer

(

writer

())     

.listener

(

new

 

PkslowWriteListener

())     

.build

(); } /<code>

執行後看一下日誌:

通過例子講解Spring Batch入門,優秀的批處理框架

這裡就能明顯看到之前設置的chunk的作用了。Writer每次是處理5條記錄,如果一條輸出一次,會對IO造成壓力。

5 總結

Spring Batch還有許多優秀的特性,如面對大量數據時的並行處理。本文主要入門介紹為主,不一一介紹,後續會專門講解。

多讀書,多分享;多寫作,多整理。


分享到:


相關文章: