Spring Cloud Stream 重點與總結

TIPS

•本文基於Spring Cloud Stream 2.2.0.RC1,包含其新特性。

•內容稍微有點亂,但這畢竟是個人學習筆記分享,不是從0到1的手把手系列博客,望知悉。

本文是當初學習Spring Cloud Stream的筆記,最初寫於16年。

原本想開個Spring Cloud Stream系列文章連載,寫Spring Cloud Stream算是個人夙願了——首先這是個人非常喜歡的組件,它屏蔽了各種MQ的差異,統一了編程模型(可以類比成基於MQ通信圈的”Spring Data”);其次個人實體書《Spring Cloud 與 Docker 微服務架構實戰》沒有包含這部分內容也是一大遺憾;更重要的是,這貨細節其實挺多,而且上手是稍微有一點曲線的。

然而,個人已同時在更新 Spring Cloud 系列以及 Spring Cloud Alibaba 系列了,再開一個系列感覺精力跟不上。於是,暫時先對照 Spring Cloud Stream 最新文檔,將內容見到到最新版本,包括新特性。

更新完現有系列後,還是會考慮出一個 Spring Cloud Stream 從入門到精通系列教程。

概念

group

By default, when a group is not specified, Spring Cloud Stream assigns the application to an anonymous and independent single-member consumer group that is in a publish-subscribe relationship with all other consumer groups.

組內只有1個實例消費。如果不設置group,則stream會自動為每個實例創建匿名且獨立的group——於是每個實例都會消費。

組內單次只有1個實例消費,並且會輪詢負載均衡。

In general, it is preferable to always specify a consumer group when binding an application to a given destination.

通常,在將應用程序綁定到給定目標時,最好始終指定使用者組。

partition

One or more producer application instances send data to multiple consumer application instances and ensure that data identified by common characteristics are processed by the same consumer instance.

一個或多個生產者將數據發送到多個消費者,並確保有共同特徵標識的數據由同一個消費者處理。默認是對消息進行hashCode,然後根據分區個數取餘,所以對於相同的消息,總會落到同一個消費者上。

destination binder

與外部消息系統通信的組件,為構造 Binding提供了 2 個方法,分別是 bindConsumer 和 bindProducer ,它們分別用於構造生產者和消費者。Binder使Spring Cloud Stream應用程序可以靈活地連接到中間件,目前spring為kafka、rabbitmq提供binder。

destination binding

Binding 是連接應用程序跟消息中間件的橋樑,用於消息的消費和生產,由binder創建。

Applying the @EnableBinding annotation to one of the application’s configuration classes defines a destination binding.

使用@EnableBinding即可定義destination binding

註解

@Input(“inboundOrders”)

Aside from generating channels for each binding and registering them as Spring beans, for each bound interface, Spring Cloud Stream generates a bean that implements the interface.

public interface Barista {
@Input("inboundOrders")
SubscribableChannel orders();
}

Input註解作用

•為每個binding生成channel實例

•指定channel名稱

•在spring容器中生成一個名為inboundOrders,類型為SubscribableChannel的bean

•在spring容器中生成一個類,實現Barista接口。

@InboundChannelAdapter

示例:

@Bean
@InboundChannelAdapter(value = Source.OUTPUT,
poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1"))
public MessageSource<string> test() {
return () -> new GenericMessage<>("Hello Spring Cloud Stream");
}
/<string>

作用:表示定義的方法能產生消息。用InboundChannelAdapter註解的方法上即使有參數也沒用。

•fixedDelay:多少毫秒發送1次

•maxMessagesPerPoll:一次發送幾條消息。

ServiceActivator

@ServiceActivator(inputChannel = Sink.INPUT, outputChannel = Source.OUTPUT)
public String transform(String payload) {
return payload.toUpperCase();
}

表示方法能夠處理消息或消息有效內容,監聽input消息,用方法體的代碼處理,然後輸出到output中。

@Transformer

和ServiceActivator差不多,表示方法能夠轉換消息,消息頭,或消息有效內容

@StreamListener(target = Sink.INPUT, condition = “headers[‘type’]==’bogey’”)

In order to be eligible to support conditional dispatching, a method must satisfy the follow conditions:

•It must not return a value.

•It must be an individual message handling method (reactive API methods are not supported).

condition的作用:符合條件,才走處理方法。

condition起作用的兩個條件:

•註解的方法沒有返回值

•方法是一個獨立方法,不支持Reactive API

代碼示例:

@StreamListener(value = Sink.INPUT, condition = "headers['type']=='dog'")
public void handle(String body) {
System.out.println("Received: " + body);
}
@Bean
@InboundChannelAdapter(value = Source.OUTPUT,
poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "2"))
public MessageSource<string> test() {
return () -> {
Map<string> map = new HashMap<>(1);
map.put("type", "dog");
return new GenericMessage<>("abcdef", map);
};
}
/<string>/<string>

PollableMessageSource

作用:允許消費者控制消費速率。

@SpringBootApplication
@EnableBinding({ConsumerApplication.PolledProcessor.class})
@EnableScheduling
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@Autowired
private PolledProcessor polledProcessor;
@Scheduled(fixedDelay = 5_000)
public void poll() {
polledProcessor.input().poll(message -> {
byte[] bytes = (byte[]) message.getPayload();
String payload = new String(bytes);
System.out.println(payload);
});
}
public interface PolledProcessor {
@Input
PollableMessageSource input();
@Output
MessageChannel output();

}
@Bean
@InboundChannelAdapter(value = "output",
poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
public MessageSource<string> test() {
return () -> {
Map<string> map = new HashMap<>(1);
map.put("type", "dog");
return new GenericMessage<>("adfdfdsafdsfa", map);
};
}
}
/<string>/<string>

如果不想自己進行byte數組轉換,可以添加配置:

spring:
cloud:
stream:
bindings:
output:
# 指定content-type
content-type: text/plain

相關文章:https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers[1]

錯誤處理

應用處理

方式1:處理指定channel

配置:

spring:
cloud:
stream:
bindings:
input:

destination: my-destination
group: my-group
output:
destination: my-destination

代碼:

@Slf4j
@SpringBootApplication
@EnableBinding({Processor.class})
@EnableScheduling
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@StreamListener(value = Processor.INPUT)
public void handle(String body) {
throw new RuntimeException("x");
}
@ServiceActivator(inputChannel = "my-destination.my-group.errors")
public void handleError(ErrorMessage message) {
Throwable throwable = message.getPayload();
log.error("截獲異常", throwable);
Message> originalMessage = message.getOriginalMessage();
assert originalMessage != null;
log.info("原始消息體 = {}", new String((byte[]) originalMessage.getPayload()));
}
@Bean
@InboundChannelAdapter(value = Processor.OUTPUT,
poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
public MessageSource<string> test() {
return () -> new GenericMessage<>("adfdfdsafdsfa");
}
}
/<string>

方式2:處理所有channel

@StreamListener(value = Processor.INPUT)
public void handle(String body) {
throw new RuntimeException("x");
}
@StreamListener("errorChannel")
public void error(Message> message) {
ErrorMessage errorMessage = (ErrorMessage) message;
System.out.println("Handling ERROR: " + errorMessage);

}

系統處理

系統處理方式,因消息中間件不同而異。如果應用沒有配置錯誤處理器,那麼error將會被傳播給binder,binder將error回傳給消息中間件。消息中間件可以丟棄消息、requeue(重新排隊,從而重新處理)或將失敗的消息發送給DLQ(死信隊列)。

丟棄

默認情況下,錯誤消息將被丟棄。雖然在某些情況下可以接受,但這種方式一般不適用於生產。

DLQ

配置:

spring:
cloud:
stream:
bindings:
input:
destination: my-destination
group: my-group
output:
destination: my-destination
rabbit:
bindings:
input:
consumer:
auto-bind-dlq: true

代碼:

@StreamListener(value = Processor.INPUT)
public void handle(String body) {
throw new RuntimeException("x");
}
@Bean
@InboundChannelAdapter(value = Processor.OUTPUT,
poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
public MessageSource<string> test() {
return () -> new GenericMessage<>("adfdfdsafdsfa");
}
/<string>

這樣,消息消費失敗後,就會放入死信隊列。在控制檯操作一下,即可將這些消息放回消息隊列。客戶端就可以重新處理。

如果想獲取原始錯誤的異常堆棧,可添加如下配置:

spring:
cloud:
stream:
rabbit:
bindings:
input:
consumer:
republish-to-dlq: true

requeue

Rabbit/Kafka的binder依賴RetryTemplate實現重試,從而提升消息處理的成功率。然而,如果設置了spring.cloud.stream.bindings.input.consumer.max-attempts=1 ,那麼RetryTemplate則不再重試。此時可通過requeue方式處理異常。

添加如下配置:

# 默認是3,設為1則禁用重試
spring.cloud.stream.bindings..consumer.max-attempts=1
# 表示是否要requeue被拒絕的消息(即:requeue處理失敗的消息)
spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=true

這樣,失敗的消息將會被重新提交到同一個handler進行處理,直到handler拋出 AmqpRejectAndDontRequeueException 異常為止。

RetryTemplate

配置方式

RetryTemplate重試也是錯誤處理的一種手段。

spring:
cloud:
stream:
bindings:
:
consumer:
# 最多嘗試處理幾次,默認3
maxAttempts: 3
# 重試時初始避退間隔,單位毫秒,默認1000
backOffInitialInterval: 1000
# 重試時最大避退間隔,單位毫秒,默認10000
backOffMaxInterval: 10000
# 避退乘數,默認2.0
backOffMultiplier: 2.0
# 當listen拋出retryableExceptions未列出的異常時,是否要重試
defaultRetryable: true
# 異常是否允許重試的map映射

retryableExceptions:
java.lang.RuntimeException: true
java.lang.IllegalStateException: false

測試代碼:

@StreamListener(value = Processor.INPUT)
public void handle(String body) {
throw new RuntimeException(body);
}
private AtomicInteger count = new AtomicInteger(0);
@Bean
@InboundChannelAdapter(value = Processor.OUTPUT,
poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
public MessageSource<string> test() {
return () -> new GenericMessage<>(count.getAndAdd(1) + "");
}
/<string>

編碼方式

多數場景下,使用配置方式定製重試行為都是可以滿足需求的,但配置方式可能無法滿足一些複雜需求。此時可使用編碼方式配置RetryTemplate:

@Configuration
class RetryConfiguration {
@StreamRetryTemplate
public RetryTemplate sinkConsumerRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(retryPolicy());
retryTemplate.setBackOffPolicy(backOffPolicy());
return retryTemplate;
}
private ExceptionClassifierRetryPolicy retryPolicy() {
BinaryExceptionClassifier keepRetryingClassifier = new BinaryExceptionClassifier(
Collections.singletonList(IllegalAccessException.class
));
keepRetryingClassifier.setTraverseCauses(true);
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(3);
AlwaysRetryPolicy alwaysRetryPolicy = new AlwaysRetryPolicy();
ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy();
retryPolicy.setExceptionClassifier(

classifiable -> keepRetryingClassifier.classify(classifiable) ?
alwaysRetryPolicy : simpleRetryPolicy);
return retryPolicy;
}
private FixedBackOffPolicy backOffPolicy() {
final FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(2);
return backOffPolicy;
}
}

然後添加配置:

spring.cloud.stream.bindings..consumer.retry-template-name=myRetryTemplate

注:

Spring Cloud Stream 2.2才支持設置retry-template-name

動態綁定目標

這是Spring Integration原生的API,建議有時間瞭解下Spring Integration相關文檔。

代碼示例:

@EnableBinding
@Controller
public class SourceWithDynamicDestination {
@Autowired
private BinderAwareChannelResolver resolver;
@Autowired
@Qualifier("sourceChannel")
private MessageChannel localChannel;
@RequestMapping(path = "/", method = POST, consumes = "*/*")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
localChannel.send(
MessageBuilder.createMessage(
body,
new MessageHeaders(
Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType)
)
)
);

}
@Bean(name = "sourceChannel")
public MessageChannel localChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "sourceChannel")
public ExpressionEvaluatingRouter router() {
ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.id"));
router.setDefaultOutputChannelName("default-output");
router.setChannelResolver(resolver);
return router;
}
//Following sink is used as test consumer. It logs the data received through the consumer.
@EnableBinding(Sink.class)
static class TestSink {
private final Log logger = LogFactory.getLog(getClass());
@StreamListener(Sink.INPUT1)
public void receive(String data) {
logger.info("Data received from customer-1..." + data);
}
@StreamListener(Sink.INPUT2)
public void receiveX(String data) {
logger.info("Data received from customer-2..." + data);
}
}
interface Sink {
String INPUT1 = "input1";
String INPUT2 = "input2";
@Input(INPUT1)
SubscribableChannel input1();
@Input(INPUT2)
SubscribableChannel input2();
}
}

乾貨分享

最近將個人學習筆記整理成冊,使用PDF分享。關注我,回覆如下代碼,即可獲得百度盤地址,無套路領取!

•001:《Java併發與高併發解決方案》學習筆記;

•002:《深入JVM內核——原理、診斷與優化》學習筆記;

•003:《Java面試寶典》

•004:《Docker開源書》

•005:《Kubernetes開源書》

•006:《DDD速成(領域驅動設計速成)》

References

[1]: https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers


分享到:


相關文章: