TIPS
本文基於Spring Cloud Greenwich SR1 + spring-cloud-starter-stream-rocketmq 0.9.0
理論兼容:Spring Cloud Finchley+ +spring-cloud-starter-stream-rocketmq 0.2.2+
MQ使用的是RocketMQ,也可使用Kafka或者RabbitMQ。
本文探討Spring Cloud Stream & RocketMQ過濾消息的各種姿勢。
在實際項目中,我們可能需要實現消息消費的過濾。
舉個例子:實現消息的分流處理:
生產者生產的消息,雖然消息體可能一樣,但是header不一樣。可編寫兩個或者更多的消費者,對不同header的消息做針對性的處理!
condition
生產者
生產者設置一下header,比如my-header,值根據你的需要填寫:
@Autowired private Source source; public String testStream() { this.source.output() .send( MessageBuilder .withPayload("消息體") .setHeader("my-header","你的header") .build() ); return "success"; }
消費者
@Service @Slf4j public class TestStreamConsumer { @StreamListener(value = Sink.INPUT,condition = "headers['my-header']=='你的header'") public void receive(String messageBody) { log.info("通過stream收到了消息:messageBody ={}", messageBody); } }
如代碼所示,使用 StreamListener 註解的 condition 屬性。當 headers['my-header']=='你的header' 條件滿足,才會進入到方法體。
Tags
TIPS
該方式只支持RoketMQ,不支持Kafka/RabbitMQ
生產者
@Autowired private Source source; public String testStream() { this.source.output() .send( MessageBuilder .withPayload("消息體") // 注意:只能設置1個tag .setHeader(RocketMQHeaders.TAGS, "tag1") .build() ); return "success"; }
消費者
1 接口
public interface MySink { String INPUT1 = "input1"; String INPUT2 = "input2"; @Input(INPUT1) SubscribableChannel input(); @Input(INPUT2) SubscribableChannel input2(); }
2 註解
@EnableBinding({MySink.class})
3 配置
spring: cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: input1: consumer: # 表示input2消費帶有tag1的消息 tags: tag1 input2: consumer: # 表示input2消費帶有tag2或者tag3的消息 tags: tag2||tag3 bindings: input1: destination: test-topic group: test-group1 input2: destination: test-topic group: test-group2
4 消費代碼
@Service @Slf4j public class MyTestStreamConsumer { /** * 我消費帶有tag1的消息 * * @param messageBody 消息體 */ @StreamListener(MySink.INPUT1) public void receive1(String messageBody) { log.info("帶有tag1的消息被消費了:messageBody ={}", messageBody); } /** * 我消費帶有tag1或者tag2的消息 * * @param messageBody 消息體 */ @StreamListener(MySink.INPUT2) public void receive2(String messageBody) { log.info("帶有tag2/tag3的消息被消費了:messageBody ={}", messageBody); } }
5 日誌:
2019-08-04 19:10:03.799 INFO 53760 --- [MessageThread_1] c.i.u.rocketmq.MyTestStreamConsumer : 帶有tag1的消息被消費了:messageBody =消息體
Sql 92
TIPS
•該方式只支持RoketMQ,不支持Kafka/RabbitMQ
•用了sql,就不要用Tag
RocketMQ支持使用SQL語法過濾消息。官方文檔:http://rocketmq.apache.org/rocketmq/filter-messages-by-sql92-in-rocketmq/
Spring Clous Stream RocketMQ也為此特性提供了支持。
開啟SQL 92支持
默認情況下,RocketMQ的SQL過濾支持是關閉的,要想使用SQL 92過濾消息,需要:
1 在 conf/broker.conf 添加
enablePropertyFilter = true
2 啟動RocketMQ
nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf &
生產者
@Autowired private Source source; public String testStream() { this.source.output() .send( MessageBuilder .withPayload("消息體") .setHeader("index", 1000) .build() ); return "success"; }
消費者
1 接口
public interface MySink { String INPUT1 = "input1"; String INPUT2 = "input2"; @Input(INPUT1) SubscribableChannel input(); @Input(INPUT2) SubscribableChannel input2(); }
2 註解
@EnableBinding({MySink.class})
3 配置
spring: cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: input1: consumer: sql: 'index < 1000' input2: consumer: sql: 'index >= 1000' bindings: input1: destination: test-topic group: test-group1 input2: destination: test-topic group: test-group2
4 消費代碼
@Service @Slf4j public class MyTestStreamConsumer { /** * 我消費帶有tag1的消息 * * @param messageBody 消息體 */ @StreamListener(MySink.INPUT1) public void receive1(String messageBody) { log.info("index > 1000的消息被消費了:messageBody ={}", messageBody); } /** * 我消費帶有tag1或者tag2的消息 * * @param messageBody 消息體 */ @StreamListener(MySink.INPUT2) public void receive2(String messageBody) { log.info("index <=1000 的消息被消費了:messageBody ={}", messageBody); } }
5 日誌
2019-08-04 19:58:59.787 INFO 56375 --- [MessageThread_1] c.i.u.rocketmq.MyTestStreamConsumer : index <=1000 的消息被消費了:messageBody =消息體
相關代碼
org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties
參考文檔
•Filter Messages By SQL92 In RocketMQ[1]
•RocketMQ 錯誤:The broker does not support consumer to filter message by SQL92[2]
乾貨分享
最近將個人學習筆記整理成冊,使用PDF分享。關注我,回覆如下代碼,即可獲得百度盤地址,無套路領取!
•001:《Java併發與高併發解決方案》學習筆記;
•002:《深入JVM內核——原理、診斷與優化》學習筆記;
•003:《Java面試寶典》
•004:《Docker開源書》
•005:《Kubernetes開源書》
•006:《DDD速成(領域驅動設計速成)》