Spring Cloud Stream知識點盤點

前面,已經探討了:

本文對Spring Cloud Stream,做一個知識點盤點和總結,包括:

•概念

•Stream註解

•Spring Cloud Integration(Spring Cloud Stream的底層)註解

•Spring Messaging(Spring消息編程模型)註解

•Spring Cloud Stream API

概念

group

組內只有1個實例消費。如果不設置group,則stream會自動為每個實例創建匿名且獨立的group——於是每個實例都會消費。

組內單次只有1個實例消費,並且會輪詢負載均衡。通常,在將應用程序綁定到給定目標時,最好始終指定consumer group。

destination binder

與外部消息系統通信的組件,為構造 Binding提供了 2 個方法,分別是 bindConsumer 和 bindProducer ,它們分別用於構造生產者和消費者。Binder使Spring Cloud Stream應用程序可以靈活地連接到中間件,目前spring為kafka、rabbitmq提供binder。

destination binding

Binding 是連接應用程序跟消息中間件的橋樑,用於消息的消費和生產,由binder創建。

partition

TIPS

嚴格來說這個不是概念,而是一種Stream提高伸縮性、吞吐量的一種方式。不過不想另起標題了,寫在這裡吧。

一個或多個生產者將數據發送到多個消費者,並確保有共同特徵標識的數據由同一個消費者處理。默認是對消息進行hashCode,然後根據分區個數取餘,所以對於相同的消息,總會落到同一個消費者上。

註解

Input(Stream)

示例:

public interface Barista {
 @Input("inboundOrders")
 SubscribableChannel orders();
}

作用:

•用於接收消息

•為每個binding生成channel實例

•指定channel名稱

•在spring容器中生成一個名為inboundOrders,類型為SubscribableChannel的bean

•在spring容器中生成一個類,實現Barista接口。

Output(Stream)

示例:

public interface Source {
 @Output
 MessageChannel output();
}

作用:

類似Input,只是用來生產消息。

StreamListener(Stream)

示例:

@StreamListener(value = Sink.INPUT, condition = "headers['type']=='dog'")
public void handle(String body) {
 System.out.println("Received: " + body);
}
@Bean
@InboundChannelAdapter(value = Source.OUTPUT,
 poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "2"))
public MessageSource test() {
 return () -> {
 Map map = new HashMap<>(1);
 map.put("type", "dog");
 return new GenericMessage<>("abcdef", map);
 };
}

作用:

用於消費消息

condition的作用:符合條件,才進入處理方法。

condition起作用的兩個條件:

•註解的方法沒有返回值

•方法是一個獨立方法,不支持Reactive API

SendTo(messaging)

示例:

// 接收INPUT這個channel的消息,並將返回值發送到OUTPUT這個channel
@StreamListener(Sink.INPUT)
@SendTo(Source.OUTPUT)
public String receive(String receiveMsg) {
 return "handle...";
}

作用:

用於發送消息

InboundChannelAdapter(Integration)

示例:

@Bean
@InboundChannelAdapter(value = Source.OUTPUT,
 poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1"))
public MessageSource test() {
 return () -> new GenericMessage<>("Hello Spring Cloud Stream");
}

作用:

表示讓定義的方法生產消息。

注:用 InboundChannelAdapter 註解的方法上即使有參數也沒用。即下面test方法不要有參數。

•fixedDelay:多少毫秒發送1次

•maxMessagesPerPoll:一次發送幾條消息。

ServiceActivator(Integration)

示例:

@ServiceActivator(inputChannel = Sink.INPUT, outputChannel = Source.OUTPUT)
public String transform(String payload) {
 return payload.toUpperCase();
}

作用:

表示方法能夠處理消息或消息有效內容,監聽input消息,用方法體的代碼處理,然後輸出到output中。

Transformer(Integration)

示例:

@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Object transform(String message) {
 return message.toUpperCase();
}

作用:

和 ServiceActivator 類似,表示方法能夠轉換消息,消息頭,或消息有效內容

PollableMessageSource(Stream)

示例代碼:

@SpringBootApplication
@EnableBinding({ConsumerApplication.PolledProcessor.class})
@EnableScheduling
public class ConsumerApplication {
 public static void main(String[] args) {
 SpringApplication.run(ConsumerApplication.class, args);
 }
 @Autowired
 private PolledProcessor polledProcessor;
 @Scheduled(fixedDelay = 5_000)
 public void poll() {
 polledProcessor.input().poll(message -> {
 byte[] bytes = (byte[]) message.getPayload();
 String payload = new String(bytes);
 System.out.println(payload);
 });
 }
 public interface PolledProcessor {
 @Input
 PollableMessageSource input();
 @Output
 MessageChannel output();
 }
 @Bean
 @InboundChannelAdapter(value = "output",
 poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
 public MessageSource test() {
 return () -> {
 Map map = new HashMap<>(1);
 map.put("type", "dog");
 return new GenericMessage<>("adfdfdsafdsfa", map);
 };
 }
}

如果不想自己做byte數組轉換,可以添加配置:

spring:
 cloud:
 stream:
 bindings:
 output:
 # 指定content-type
 content-type: text/plain

作用:

允許消費者控制消費速率。

相關文章:

https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers[1]

乾貨分享

最近將個人學習筆記整理成冊,使用PDF分享。關注我,回覆如下代碼,即可獲得百度盤地址,無套路領取!

•001:《Java併發與高併發解決方案》學習筆記;

•002:《深入JVM內核——原理、診斷與優化》學習筆記;

•003:《Java面試寶典》

•004:《Docker開源書》

•005:《Kubernetes開源書》

•006:《DDD速成(領域驅動設計速成)》


分享到:


相關文章: