Spring Boot 事件和監聽

1、自定義事件和監聽

1.1、定義事件

 1 package com.cjs.boot.event;
2
3 import lombok.Data;
4 import org.springframework.context.ApplicationEvent;
5
6 @Data
7 public class BlackListEvent extends ApplicationEvent {
8
9 private String address;
10
11 public BlackListEvent(Object source, String address) {
12 super(source);
13 this.address = address;
14 }
15 }

1.2、定義監聽

 1 package com.cjs.boot.event;
2
3 import org.springframework.context.ApplicationListener;
4 import org.springframework.context.event.EventListener;
5 import org.springframework.stereotype.Component;
6
7 8 public class BlackListListener implements ApplicationListener<blacklistevent> {
9
10 @Override
11 public void onApplicationEvent(BlackListEvent event) {
12 System.out.println("監聽到BlackListEvent事件: " + event.getAddress());
13 try {
14 Thread.sleep(2000);
15 } catch (InterruptedException e) {
16 e.printStackTrace();
17 }
18 }
19 }
/<blacklistevent>

1.3、註冊監聽

 1 package com.cjs.boot;
2
3 import com.cjs.boot.event.BlackListListener;
4 import org.springframework.boot.SpringApplication;
5 import org.springframework.boot.autoconfigure.SpringBootApplication;
6 import org.springframework.boot.web.server.ErrorPage;
7 import org.springframework.boot.web.server.ErrorPageRegistrar;
8 import org.springframework.boot.web.server.ErrorPageRegistry;
9 import org.springframework.cache.annotation.EnableCaching;
10 import org.springframework.context.annotation.Bean;
11 import org.springframework.http.HttpStatus;
12 import org.springframework.scheduling.annotation.EnableAsync;
13
14 @SpringBootApplication
15 public class CjsSpringbootExampleApplication {
16
17 public static void main(String[] args) {
18
19 SpringApplication springApplication = new SpringApplication(CjsSpringbootExampleApplication.class);
20 springApplication.addListeners(new BlackListListener());
21 springApplication.run(args);
22
23 }

1.4、發佈事件

 1 package com.cjs.boot.controller;
2
3 import com.cjs.boot.event.BlackListEvent;
4 import org.springframework.beans.factory.annotation.Autowired;
5 import org.springframework.context.ApplicationContext;
6 import org.springframework.context.ApplicationEventPublisher;
7 import org.springframework.web.bind.annotation.GetMapping;
8 import org.springframework.web.bind.annotation.RequestMapping;
9 import org.springframework.web.bind.annotation.RestController;
10
11 @RestController
12 @RequestMapping("/activity")
13 public class ActivityController {
14
15 // @Autowired
16 // private ApplicationEventPublisher publisher;
17
18 @Autowired
19 private ApplicationContext publisher;
20

21 @GetMapping("/sayHello.json")
22 public void sayHello() {
23
24 /**
25 * You may register as many event listeners as you wish, but note that by default event listeners receive events synchronously.
26 * This means the publishEvent() method blocks until all listeners have finished processing the event.
27 */
28
29 BlackListEvent event = new BlackListEvent(this, "[email protected]");
30 publisher.publishEvent(event);
31 System.out.println("事件發佈成功");
32 }
33
34 }

2、基於註解的事件監聽

package com.cjs.boot.event;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Component
public class BlackListListener {
@EventListener
public void processBlackListEvent(BlackListEvent event) {
System.out.println(123);
}
}
---
package com.cjs.boot;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class CjsSpringbootExampleApplication {
public static void main(String[] args) {
SpringApplication.run(CjsSpringbootExampleApplication.class, args);
}
}

3、異步監聽

1 @EventListener
2 @Async

3 public void processBlackListEvent(BlackListEvent event) {
4 // BlackListEvent is processed in a separate thread
5 }

4、應用

 6 import lombok.extern.slf4j.Slf4j;
7 import org.springframework.beans.factory.annotation.Autowired;
8 import org.springframework.context.event.EventListener;
9 import org.springframework.scheduling.annotation.Async;
10 import org.springframework.stereotype.Component;
11
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.concurrent.ExecutionException;
15 import java.util.concurrent.Future;
16 import java.util.concurrent.atomic.AtomicInteger;
17
18 /**
19 * 批量送券
20 */
21 @Slf4j
22 @Component
23 public class BatchSendCouponListener {
24
25 @Autowired
26 private CouponPresentLogService couponPresentLogService;
27
28 @Async
29 @EventListener
30 public void processBatchSendCouponEvent(BatchSendCouponEvent batchSendCouponEvent) {
31 Long cpId = batchSendCouponEvent.getCouponPresentId();
32 log.info("收到BatchSendCouponEvent, cpId={}", cpId);
33 List<couponpresentlogentity> list = couponPresentLogService.selectByPid(cpId);
34
35 handle(cpId, list, 0);
36 }
37
38 private void handle(Long cpId, List<couponpresentlogentity> list, int times) {
39 if (times >= 2) {
40 log.info("超過重試次數退出, cpId: {}, 剩餘: {}", cpId, list.size());
41 return;
42 }
43
44 List<future>> futureList = new ArrayList<>();
45

46 for (CouponPresentLogEntity entity : list) {
47 futureList.add(couponPresentLogService.present(entity));
48 }
49
50 AtomicInteger count = new AtomicInteger(0);
51 // 收集失敗的
52 List<couponpresentlogentity> failList = new ArrayList<>();
53 for (Future<couponpresentlogentity> future : futureList) {
54 try {
55 CouponPresentLogEntity couponPresentLogEntity = future.get();
56 if (couponPresentLogEntity.getStatus() != PresentStatusEnum.SUCCESS.getType().intValue()) {
57 failList.add(couponPresentLogEntity);
58 }
59 count.getAndIncrement();
60 if (count.intValue() >= list.size()) {
61 List<couponpresentlogentity> failPresentLogList = couponPresentLogService.selectFailLogByPid(cpId);
62 if (null != failPresentLogList && failPresentLogList.size() > 0) {
63 times++;
64 log.info("第{}次重試, CPID: {}, 總計: {}, 失敗: {}", times, cpId, list.size(), failPresentLogList.size());
65 handle(cpId, failPresentLogList, times);
66 }
67 }
68 } catch (InterruptedException e) {
69 log.error(e.getMessage(), e);
70 } catch (ExecutionException e) {
71 log.error(e.getMessage(), e);
72 }
73 }
74 }
75
76 }
/<couponpresentlogentity>/<couponpresentlogentity>/<couponpresentlogentity>/<future>/<couponpresentlogentity>/<couponpresentlogentity>
 1 import lombok.extern.slf4j.Slf4j;
2 import org.springframework.beans.factory.annotation.Autowired;
3 import org.springframework.scheduling.annotation.Async;
4 import org.springframework.scheduling.annotation.AsyncResult;
5 import org.springframework.stereotype.Service;
6
7 import javax.annotation.Resource;
8 import java.util.concurrent.*;
9
10 @Service
11 @Slf4j
12 public class CouponPresentLogServiceImpl implements CouponPresentLogService {
13
14 @Autowired
15 private CouponPresentLogDao couponPresentLogDao;
16 @Resource

17 private CouponSendRpcService couponSendRpcService;
18
19 @Async("myThreadPoolTaskExecutor")
20 @Override
21 public Future<couponpresentlogentity> present(CouponPresentLogEntity entity) {
22 try {
23 CouponBaseResponse rst = couponSendRpcService.send(entity.getUserId(), entity.getCouponBatchKey(), "1", entity.getVendorId());
24 if (null != rst && rst.isSuccess()) {
25 entity.setStatus(PresentStatusEnum.SUCCESS.getType());
26 entity.setFailureReason(PresentStatusEnum.SUCCESS.getName());
27 }else {
28 String reason = (null == rst) ? "響應異常" : rst.getMsg();
29 entity.setFailureReason(reason);
30 entity.setStatus(PresentStatusEnum.FAILURE.getType());
31 }
32 }catch (Exception ex) {
33 log.error(ex.getMessage(), ex);
34 entity.setFailureReason(ex.getMessage());
35 entity.setStatus(PresentStatusEnum.FAILURE.getType());
36 }
37 couponPresentLogDao.update(entity);
38
39 return new AsyncResult<couponpresentlogentity>(entity);
40 }
41
42 }
/<couponpresentlogentity>/<couponpresentlogentity>

5、統計異步任務執行的進度

利用Future獲取執行結果,比如上面的例子中,由於不是直接提交的任務,所以用AsyncResult來返回結果

上面的例子中,一個大任務,然後下面有許多子任務。在主任務中,統計各子任務的執行情況,是成功還是失敗,然後統計成功多少,失敗多少

也可以這樣寫:

@Autowired
ThreadPoolTaskExecutor taskExecutor;
Future<object> future = taskExecutor.submit(new Callable<object>() {
@Override
public Object call() throws Exception {
return null;
}
});
/<object>/<object>

如果想學習Java工程化、高性能及分佈式、深入淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友可以加我的Java高級交流:854630135,群裡有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給大家。


分享到:


相關文章: