前面,已經探討了:
•
•
本文對Spring Cloud Stream,做一個知識點盤點和總結,包括:
•概念
•Stream註解
•Spring Cloud Integration(Spring Cloud Stream的底層)註解
•Spring Messaging(Spring消息編程模型)註解
•Spring Cloud Stream API
概念
group
組內只有1個實例消費。如果不設置group,則stream會自動為每個實例創建匿名且獨立的group——於是每個實例都會消費。
組內單次只有1個實例消費,並且會輪詢負載均衡。通常,在將應用程序綁定到給定目標時,最好始終指定consumer group。
destination binder
與外部消息系統通信的組件,為構造 Binding提供了 2 個方法,分別是 bindConsumer 和 bindProducer ,它們分別用於構造生產者和消費者。Binder使Spring Cloud Stream應用程序可以靈活地連接到中間件,目前spring為kafka、rabbitmq提供binder。
destination binding
Binding 是連接應用程序跟消息中間件的橋樑,用於消息的消費和生產,由binder創建。
partition
TIPS
嚴格來說這個不是概念,而是一種Stream提高伸縮性、吞吐量的一種方式。不過不想另起標題了,寫在這裡吧。
一個或多個生產者將數據發送到多個消費者,並確保有共同特徵標識的數據由同一個消費者處理。默認是對消息進行hashCode,然後根據分區個數取餘,所以對於相同的消息,總會落到同一個消費者上。
註解
Input(Stream)
示例:
public interface Barista { @Input("inboundOrders") SubscribableChannel orders(); }
作用:
•用於接收消息
•為每個binding生成channel實例
•指定channel名稱
•在spring容器中生成一個名為inboundOrders,類型為SubscribableChannel的bean
•在spring容器中生成一個類,實現Barista接口。
Output(Stream)
示例:
public interface Source { @Output MessageChannel output(); }
作用:
類似Input,只是用來生產消息。
StreamListener(Stream)
示例:
@StreamListener(value = Sink.INPUT, condition = "headers['type']=='dog'") public void handle(String body) { System.out.println("Received: " + body); } @Bean @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "2")) public MessageSource test() { return () -> { Map map = new HashMap<>(1); map.put("type", "dog"); return new GenericMessage<>("abcdef", map); }; }
作用:
用於消費消息
condition的作用:符合條件,才進入處理方法。
condition起作用的兩個條件:
•註解的方法沒有返回值
•方法是一個獨立方法,不支持Reactive API
SendTo(messaging)
示例:
// 接收INPUT這個channel的消息,並將返回值發送到OUTPUT這個channel @StreamListener(Sink.INPUT) @SendTo(Source.OUTPUT) public String receive(String receiveMsg) { return "handle..."; }
作用:
用於發送消息
InboundChannelAdapter(Integration)
示例:
@Bean @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1")) public MessageSource test() { return () -> new GenericMessage<>("Hello Spring Cloud Stream"); }
作用:
表示讓定義的方法生產消息。
注:用 InboundChannelAdapter 註解的方法上即使有參數也沒用。即下面test方法不要有參數。
•fixedDelay:多少毫秒發送1次
•maxMessagesPerPoll:一次發送幾條消息。
ServiceActivator(Integration)
示例:
@ServiceActivator(inputChannel = Sink.INPUT, outputChannel = Source.OUTPUT) public String transform(String payload) { return payload.toUpperCase(); }
作用:
表示方法能夠處理消息或消息有效內容,監聽input消息,用方法體的代碼處理,然後輸出到output中。
Transformer(Integration)
示例:
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public Object transform(String message) { return message.toUpperCase(); }
作用:
和 ServiceActivator 類似,表示方法能夠轉換消息,消息頭,或消息有效內容
PollableMessageSource(Stream)
示例代碼:
@SpringBootApplication @EnableBinding({ConsumerApplication.PolledProcessor.class}) @EnableScheduling public class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } @Autowired private PolledProcessor polledProcessor; @Scheduled(fixedDelay = 5_000) public void poll() { polledProcessor.input().poll(message -> { byte[] bytes = (byte[]) message.getPayload(); String payload = new String(bytes); System.out.println(payload); }); } public interface PolledProcessor { @Input PollableMessageSource input(); @Output MessageChannel output(); } @Bean @InboundChannelAdapter(value = "output", poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1")) public MessageSource test() { return () -> { Map map = new HashMap<>(1); map.put("type", "dog"); return new GenericMessage<>("adfdfdsafdsfa", map); }; } }
如果不想自己做byte數組轉換,可以添加配置:
spring: cloud: stream: bindings: output: # 指定content-type content-type: text/plain
作用:
允許消費者控制消費速率。
相關文章:
https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers[1]
乾貨分享
最近將個人學習筆記整理成冊,使用PDF分享。關注我,回覆如下代碼,即可獲得百度盤地址,無套路領取!
•001:《Java併發與高併發解決方案》學習筆記;
•002:《深入JVM內核——原理、診斷與優化》學習筆記;
•003:《Java面試寶典》
•004:《Docker開源書》
•005:《Kubernetes開源書》
•006:《DDD速成(領域驅動設計速成)》