1 前言
Spring Batch是一個輕量級的、完善的批處理框架,作為Spring體系中的一員,它擁有靈活、方便、生產可用的特點。在應對高效處理大量信息、定時處理大量數據等場景十分簡便。
結合調度框架能更大地發揮Spring Batch的作用。
2 Spring Batch的概念知識
2.1 分層架構
Spring Batch的分層架構圖如下:
可以看到它分為三層,分別是:
- Application應用層:包含了所有任務batch jobs和開發人員自定義的代碼,主要是根據項目需要開發的業務流程等。
- Batch Core核心層:包含啟動和管理任務的運行環境類,如JobLauncher等。
- Batch Infrastructure基礎層:上面兩層是建立在基礎層之上的,包含基礎的讀入reader和寫出writer、重試框架等。
2.2 關鍵概念
理解下圖所涉及的概念至關重要,不然很難進行後續開發和問題分析。
2.2.1 JobRepository
專門負責與數據庫打交道,對整個批處理的新增、更新、執行進行記錄。所以Spring Batch是需要依賴數據庫來管理的。
2.2.2 任務啟動器JobLauncher
負責啟動任務Job。
2.2.3 任務Job
Job是封裝整個批處理過程的單位,跑一個批處理任務,就是跑一個Job所定義的內容。
上圖介紹了Job的一些相關概念:
- Job:封裝處理實體,定義過程邏輯。
- JobInstance:Job的運行實例,不同的實例,參數不同,所以定義好一個Job後可以通過不同參數運行多次。
- JobParameters:與JobInstance相關聯的參數。
- JobExecution:代表Job的一次實際執行,可能成功、可能失敗。
所以,開發人員要做的事情,就是定義Job。
2.2.4 步驟Step
Step是對Job某個過程的封裝,一個Job可以包含一個或多個Step,一步步的Step按特定邏輯執行,才代表Job執行完成。
通過定義Step來組裝Job可以更靈活地實現複雜的業務邏輯。
2.2.5 輸入——處理——輸出
所以,定義一個Job關鍵是定義好一個或多個Step,然後把它們組裝好即可。而定義Step有多種方法,但有一種常用的模型就是輸入——處理——輸出,即Item Reader、Item Processor和Item Writer。比如通過Item Reader從文件輸入數據,然後通過Item Processor進行業務處理和數據轉換,最後通過Item Writer寫到數據庫中去。
Spring Batch為我們提供了許多開箱即用的Reader和Writer,非常方便。
3 代碼實例
理解了基本概念後,就直接通過代碼來感受一下吧。整個項目的功能是從多個csv文件中讀數據,處理後輸出到一個csv文件。
3.1 基本框架
添加依賴:
<code><
dependency
><
groupId
>org.springframework.bootgroupId
><
artifactId
>spring-boot-starter-batchartifactId
>dependency
><
dependency
><
groupId
>com.h2databasegroupId
><
artifactId
>h2artifactId
><
scope
> runtimescope
>dependency
> /<code>
需要添加Spring Batch的依賴,同時使用H2作為內存數據庫比較方便,實際生產肯定是要使用外部的數據庫,如Oracle、PostgreSQL。
入口主類:
<code>@SpringBootApplication
@EnableBatchProcessing
public class PkslowBatchJobMain {public
static
void
main
(String[] args) {SpringApplication
.run
(PkslowBatchJobMain.class, args); } } /<code>
也很簡單,只是在Springboot的基礎上添加註解@EnableBatchProcessing。
領域實體類Employee:
<code>package
com.pkslow.batch.entity;public
class Employee {String
id;String
firstName;String
last
Name; } /<code>
對應的csv文件內容如下:
<code>id,firstName,lastName
1
,Lokesh,Gupta
2
,Amit,Mishra
3
,Pankaj,Kumar
4
,David,Miller
/<code>
3.2 輸入——處理——輸出
3.2.1 讀取ItemReader
因為有多個輸入文件,所以定義如下:
<code> ("input/inputData*.csv"
)private
Resource[] inputResources;public
MultiResourceItemReadermultiResourceItemReader
()
{ MultiResourceItemReader resourceItemReader =new
MultiResourceItemReader(); resourceItemReader.setResources(inputResources); resourceItemReader.setDelegate(reader());return
resourceItemReader; }public
FlatFileItemReaderreader
()
{ FlatFileItemReader reader =new
FlatFileItemReader(); reader.setLinesToSkip(1
); reader.setLineMapper(new
DefaultLineMapper() { { setLineTokenizer(new
DelimitedLineTokenizer() { { setNames(new
String[] {"id"
,"firstName"
,"lastName"
}); } }); setFieldSetMapper(new
BeanWrapperFieldSetMapper() { { setTargetType(Employee.
class
); } }); } });return
reader; } /<code>
這裡使用了FlatFileItemReader,方便我們從文件讀取數據。
3.2.2 處理ItemProcessor
為了簡單演示,處理很簡單,就是把最後一列轉為大寫:
<code>public
ItemProcessor itemProcessor() {return
employee -> { employee.setLastName(employee.getLastName().toUpperCase());return
employee; }; } /<code>
3.2.3 輸出ItremWriter
比較簡單,代碼及註釋如下:
<code>private
Resource outputResource =new
FileSystemResource("output/outputData.csv"
);public
FlatFileItemWriterwriter
()
{ FlatFileItemWriter writer =new
FlatFileItemWriter<>(); writer.setResource(outputResource); writer.setAppendAllowed(true
); writer.setLineAggregator(new
DelimitedLineAggregator() { { setDelimiter(","
); setFieldExtractor(new
BeanWrapperFieldExtractor() { { setNames(new
String[] {"id"
,"firstName"
,"lastName"
}); } }); } });return
writer; } /<code>
3.3 Step
有了Reader-Processor-Writer後,就可以定義Step了:
<code>@Bean
public Step csvStep() {return
stepBuilderFactory
.get
("csvStep
").<Employee
,Employee
>chunk
(5).reader
(multiResourceItemReader
()).processor
(itemProcessor
()).writer
(writer
()).build
(); } /<code>
這裡有一個chunk的設置,值為5,意思是5條記錄後再提交輸出,可以根據自己需求定義。
3.4 Job
完成了Step的編碼,定義Job就容易了:
<code>@Bean
public
JobpkslowCsvJob
() {return
jobBuilderFactory .get
("pkslowCsvJob"
) .incrementer(new
RunIdIncrementer()) .start(csvStep()) .build(); } /<code>
3.5 運行
完成以上編碼後,執行程序,結果如下:
成功讀取數據,並將最後字段轉為大寫,並輸出到outputData.csv文件。
4 監聽Listener
可以通過Listener接口對特定事件進行監聽,以實現更多業務功能。比如如果處理失敗,就記錄一條失敗日誌;處理完成,就通知下游拿數據等。
我們分別對Read、Process和Write事件進行監聽,對應分別要實現ItemReadListener接口、ItemProcessListener接口和ItemWriteListener接口。因為代碼比較簡單,就是打印一下日誌,這裡只貼出ItemWriteListener的實現代碼:
<code>public
class
PkslowWriteListener
implements
ItemWriteListener
{private
static
final
Log logger = LogFactory.getLog(PkslowWriteListener.class); @Override
public
void
beforeWrite
(List extends Employee>
list
) { logger.info("beforeWrite: "
+list
); } @Override
public
void
afterWrite
(List extends Employee>
list
) { logger.info("afterWrite: "
+list
); } @Override
public
void
onWriteError
(Exception e, List extends Employee>
list
) { logger.info("onWriteError: "
+list
); } } /<code>
把實現的監聽器listener整合到Step中去:
<code>@Bean
public Step csvStep() {return
stepBuilderFactory
.get
("csvStep
").<Employee
,Employee
>chunk
(5).reader
(multiResourceItemReader
()).listener
(new
PkslowReadListener
()).processor
(itemProcessor
()).listener
(new
PkslowProcessListener
()).writer
(writer
()).listener
(new
PkslowWriteListener
()).build
(); } /<code>
執行後看一下日誌:
這裡就能明顯看到之前設置的chunk的作用了。Writer每次是處理5條記錄,如果一條輸出一次,會對IO造成壓力。
5 總結
Spring Batch還有許多優秀的特性,如面對大量數據時的並行處理。本文主要入門介紹為主,不一一介紹,後續會專門講解。
多讀書,多分享;多寫作,多整理。