Spring Cloud Stream實現消息過濾消費

TIPS

本文基於Spring Cloud Greenwich SR1 + spring-cloud-starter-stream-rocketmq 0.9.0

理論兼容:Spring Cloud Finchley+ +spring-cloud-starter-stream-rocketmq 0.2.2+

MQ使用的是RocketMQ,也可使用Kafka或者RabbitMQ。

本文探討Spring Cloud Stream & RocketMQ過濾消息的各種姿勢。

在實際項目中,我們可能需要實現消息消費的過濾。

舉個例子:實現消息的分流處理:

生產者生產的消息,雖然消息體可能一樣,但是header不一樣。可編寫兩個或者更多的消費者,對不同header的消息做針對性的處理!

condition

生產者

生產者設置一下header,比如my-header,值根據你的需要填寫:

@Autowired
private Source source;
public String testStream() {
 this.source.output()
 .send(
 MessageBuilder
 .withPayload("消息體")
 .setHeader("my-header","你的header")
 .build()
 );
 return "success";
}

消費者

@Service
@Slf4j
public class TestStreamConsumer {
 @StreamListener(value = Sink.INPUT,condition = "headers['my-header']=='你的header'")
 public void receive(String messageBody) {
 log.info("通過stream收到了消息:messageBody ={}", messageBody);
 }
}

如代碼所示,使用 StreamListener 註解的 condition 屬性。當 headers['my-header']=='你的header' 條件滿足,才會進入到方法體。

Tags

TIPS

該方式只支持RoketMQ,不支持Kafka/RabbitMQ

生產者

@Autowired
private Source source;
public String testStream() {
 this.source.output()
 .send(
 MessageBuilder
 .withPayload("消息體")
 // 注意:只能設置1個tag
 .setHeader(RocketMQHeaders.TAGS, "tag1")
 .build()
 );
 return "success";
}

消費者

1 接口

public interface MySink {
 String INPUT1 = "input1";
 String INPUT2 = "input2";
 @Input(INPUT1)
 SubscribableChannel input();
 @Input(INPUT2)
 SubscribableChannel input2();
}

2 註解

@EnableBinding({MySink.class})

3 配置

spring:
 cloud:
 stream:
 rocketmq:
 binder:
 name-server: 127.0.0.1:9876
 bindings:
 input1:
 consumer:
 # 表示input2消費帶有tag1的消息
 tags: tag1
 input2:
 consumer:
 # 表示input2消費帶有tag2或者tag3的消息
 tags: tag2||tag3
 bindings:
 input1:
 destination: test-topic
 group: test-group1
 input2:
 destination: test-topic
 group: test-group2

4 消費代碼

@Service
@Slf4j
public class MyTestStreamConsumer {
 /**
 * 我消費帶有tag1的消息
 *
 * @param messageBody 消息體
 */
 @StreamListener(MySink.INPUT1)
 public void receive1(String messageBody) {
 log.info("帶有tag1的消息被消費了:messageBody ={}", messageBody);
 }
 /**
 * 我消費帶有tag1或者tag2的消息
 *
 * @param messageBody 消息體
 */
 @StreamListener(MySink.INPUT2)
 public void receive2(String messageBody) {
 log.info("帶有tag2/tag3的消息被消費了:messageBody ={}", messageBody);
 }
}

5 日誌:

2019-08-04 19:10:03.799 INFO 53760 --- [MessageThread_1] c.i.u.rocketmq.MyTestStreamConsumer : 帶有tag1的消息被消費了:messageBody =消息體

Sql 92

TIPS

•該方式只支持RoketMQ,不支持Kafka/RabbitMQ

用了sql,就不要用Tag

RocketMQ支持使用SQL語法過濾消息。官方文檔:http://rocketmq.apache.org/rocketmq/filter-messages-by-sql92-in-rocketmq/

Spring Clous Stream RocketMQ也為此特性提供了支持。

開啟SQL 92支持

默認情況下,RocketMQ的SQL過濾支持是關閉的,要想使用SQL 92過濾消息,需要:

1 在 conf/broker.conf 添加

enablePropertyFilter = true

2 啟動RocketMQ

nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf &

生產者

@Autowired
private Source source;
public String testStream() {
 this.source.output()
 .send(
 MessageBuilder
 .withPayload("消息體")
 .setHeader("index", 1000)
 .build()
 );
 return "success";
}

消費者

1 接口

public interface MySink {
 String INPUT1 = "input1";
 String INPUT2 = "input2";
 @Input(INPUT1)
 SubscribableChannel input();
 @Input(INPUT2)
 SubscribableChannel input2();
}

2 註解

@EnableBinding({MySink.class})

3 配置

spring:
 cloud:
 stream:
 rocketmq:
 binder:
 name-server: 127.0.0.1:9876
 bindings:
 input1:
 consumer:
 sql: 'index < 1000'
 input2:
 consumer:
 sql: 'index >= 1000'
 bindings:
 input1:
 destination: test-topic
 group: test-group1
 input2:
 destination: test-topic
 group: test-group2

4 消費代碼

@Service
@Slf4j
public class MyTestStreamConsumer {
 /**
 * 我消費帶有tag1的消息
 *
 * @param messageBody 消息體
 */
 @StreamListener(MySink.INPUT1)
 public void receive1(String messageBody) {
 log.info("index > 1000的消息被消費了:messageBody ={}", messageBody);
 }
 /**
 * 我消費帶有tag1或者tag2的消息
 *
 * @param messageBody 消息體
 */
 @StreamListener(MySink.INPUT2)
 public void receive2(String messageBody) {
 log.info("index <=1000 的消息被消費了:messageBody ={}", messageBody);
 }
}

5 日誌

2019-08-04 19:58:59.787 INFO 56375 --- [MessageThread_1] c.i.u.rocketmq.MyTestStreamConsumer : index <=1000 的消息被消費了:messageBody =消息體

相關代碼

org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties

參考文檔

•Filter Messages By SQL92 In RocketMQ[1]

•RocketMQ 錯誤:The broker does not support consumer to filter message by SQL92[2]

乾貨分享

最近將個人學習筆記整理成冊,使用PDF分享。關注我,回覆如下代碼,即可獲得百度盤地址,無套路領取!

•001:《Java併發與高併發解決方案》學習筆記;

•002:《深入JVM內核——原理、診斷與優化》學習筆記;

•003:《Java面試寶典》

•004:《Docker開源書》

•005:《Kubernetes開源書》

•006:《DDD速成(領域驅動設計速成)》


分享到:


相關文章: