10.22 如何搭建消息中间件应用框架之SpringCloud Stream


目录

  1. 前言
  2. 什么是Spring Messaging
  3. 什么是Sping Integration
  4. SpringCloud Stream
  5. 绑定器
  6. 发布订阅
  7. 消费组
  8. 消费分区
  9. 总结

前言

官方对 Spring Cloud Stream 的一段介绍:Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。其目的是为了简化消息在 Spring Cloud 应用程序中的开发。

老顾来翻译一下,就是现在的消息中间件比较多,如:RabbitMQ、Kafka、RocketMq等;使用方法也不一样,但是他们的本质流程是一样,都有发布/订阅、消费组以及消息分区这三个核心概念。

所以SpringCloud就实现了一套轻量级的消息驱动的微服务框架

;通过使用 Spring Cloud Stream,可以忽略消息中间件的差异,有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。

老顾先带着小伙伴们了解几个概念,这样会更方便理解。

什么是 Spring Messaging

Spring Messaging是Spring Framework中的一个模块,其作用就是统一消息的编程模型

比如消息Messaging对应的模型就包括一个消息体Payload和消息头Header


如何搭建消息中间件应用框架之SpringCloud Stream

如何搭建消息中间件应用框架之SpringCloud Stream

  • 消息通道MessageChannel用于接收消息,调用send方法可以将消息发送至该消息通道
如何搭建消息中间件应用框架之SpringCloud Stream

如何搭建消息中间件应用框架之SpringCloud Stream

消息通道里的消息如何被消费呢?

  • 由消息通道的子接口可订阅的消息通道SubscribableChannel实现,被MessageHandler消息处理器所订阅
如何搭建消息中间件应用框架之SpringCloud Stream

  • 由MessageHandler真正地
    消费/处理消息
如何搭建消息中间件应用框架之SpringCloud Stream

Spring Messaging在消息模型的基础上衍生出了其它的一些功能,如:

1、消息接收参数及返回值处理:消息接收参数处理器HandlerMethodArgumentResolver配合@Header, @Payload等注解使用;消息接收后的返回值处理器HandlerMethodReturnValueHandler配合@SendTo注解使用。

2、消息体内容转换器MessageConverter

3、统一抽象的消息发送模板AbstractMessageSendingTemplate

4、消息通道拦截器ChannelInterceptor

什么是 Spring Integration

Spring Integration是一个功能强大的EIP(Enterprise Intergration Patterns,即企业集成模式)

是对Spring Messaging的扩展,它提出了不少新的概念,包括消息的路由MessageRoute、消息的分发MessageDispatcher、消息的过滤Filter、消息的转换Transformer、消息的聚合Aggregator、消息的分割Splitter等等。

总结一句话就是对消息消费时进行额外的处理

1、消息的分割

如何搭建消息中间件应用框架之SpringCloud Stream

2、消息的聚合

如何搭建消息中间件应用框架之SpringCloud Stream

3、消息的过滤

如何搭建消息中间件应用框架之SpringCloud Stream

4、消息的分发

如何搭建消息中间件应用框架之SpringCloud Stream

我们来看一个例子

如何搭建消息中间件应用框架之SpringCloud Stream

1、步骤一先创建一个可订阅消息通道messageChannel

2、定义一个消息消费者messagehandler,去消费通道里面的消息;用了lammda表达式实现了,简单的

输出一句话

3、步骤三发送一个消息

消息最终被消息通道里的 MessageHandler 所消费,最后控制台打印出:

接收到: 第一条消息内容

我们再进入DirectChannel,内部有一个对象UnicastingDispatcher,这个是消息分发器,会分发到对应的消息通道MessageChannel中;

UnicastingDispatcher 是个单播的分发器只能选择一个消息通道。那么如何选择呢? 内部提供了 LoadBalancingStrategy 负载均衡策略,默认只有轮询的实现,可以进行扩展。

上面的代码改动一下

如何搭建消息中间件应用框架之SpringCloud Stream

由于DirectChannel内部的消息分发器是UnicastingDispatcher单播的方式,并且采用轮询的负载均衡策略,所以这里两次的消费分别对应这两个MessageHandler。控制台打印出:

如何搭建消息中间件应用框架之SpringCloud Stream

如果我们要实现广播方式,也就是BroadcastingDispatcher,它被

PublishSubscribeChannel这个消息通道所使用。广播消息分发器会把消息分发给所有的MessageHandler。

如何搭建消息中间件应用框架之SpringCloud Stream

发送两个消息,都被所有的MessageHandler所消费。控制台打印:

如何搭建消息中间件应用框架之SpringCloud Stream

Spring Cloud Stream

SpringCloud Stream(以下简称SCS)在 Spring Integration 的基础上进行了封装,提出了Binder, Binding, @EnableBinding, @StreamListener 等概念。另外SCS也整合了其他模块

1、与Spring Boot Actuator整合,提供了 /bindings, /channels endpoint

2、与Spring Boot Externalized Configuration 整合,提供了BindingProperties, BinderProperties等外部化配置类

3、SCS增强了消息发送失败的和消费失败情况下的处理逻辑等功能

SCS 是 Spring Integration 的加强,同时与 Spring Boot 体系进行了融合,也是 Spring Cloud Bus 的基础。它屏蔽了底层消息中间件的实现细节,希望以统一的一套API来进行消息的发送/消费,底层消息中间件的实现细节由各消息中间件的Binder 完成

Binder是提供与外部消息中间件集成的组件,为构造 Binding提供了 2 个方法,分别是 bindConsumer 和 bindProducer ,它们分别用于构造生产者和消费者。目前官方的实现有 Rabbit Binder 和 Kafka Binder, Spring Cloud Alibaba 内部已经实现了RocketMQ Binder。

如何搭建消息中间件应用框架之SpringCloud Stream

从图中可以看出,Binding是连接应用程序跟消息中间件的桥梁

,用于消息的消费和生产。

绑定器

Binder绑定器是Spring Cloud Stream中一个非常重要的概念。在没有绑定器这个概念的情况下,我们的Spring Boot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,这使得我们实现的消息交互逻辑就会非常笨重,因为对具体的中间件实现细节有太重的依赖,当中间件有较大的变动升级、或是更换中间件的时候,我们就需要付出非常大的代价来实施。

通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。当我们需要升级消息中间件,或是更换其他消息中间件产品时,我们要做的就是

更换它们对应的Binder绑定器而不需要修改任何Spring Boot的应用逻辑

发布-订阅模式

在Spring Cloud Stream中的消息通信方式遵循了发布-订阅模式,当一条消息被投递到消息中间件之后,它会通过共享的Topic主题进行广播,消息消费者在订阅的主题中收到它并触发自身的业务逻辑处理。这里所提到的Topic主题是Spring Cloud Stream中的一个抽象概念,用来代表发布共享消息给消费者的地方。

在不同的消息中间件中,Topic可能对应着不同的概念,比如:在RabbitMQ中的它对应了Exchange、而在Kakfa中则对应了Kafka中的Topic

消费组

虽然Spring Cloud Stream通过发布-订阅模式将消息生产者与消费者做了很好的解耦,基于相同主题的消费者可以轻松的进行扩展,但是这些扩展都是针对不同的应用实例而言的

,在现实的微服务架构中,我们每一个微服务应用为了实现高可用和负载均衡,实际上都会部署多个实例。很多情况下,消息生产者发送消息给某个具体微服务时,只希望被消费一次,按照上面我们启动两个应用的例子,虽然它们同属一个应用,但是这个消息出现了被重复消费两次的情况。为了解决这个问题,在Spring Cloud Stream中提供了消费组的概念

如果在同一个主题上的应用需要启动多个实例的时候,我们可以通过spring.cloud.stream.bindings.input.group属性为应用指定一个组名,这样这个应用的多个实例在接收到消息的时候,只会有一个成员真正的收到消息并进行处理。如下图所示,我们为Service-A和Service-B分别启动了两个实例,并且根据服务名进行了分组,这样当消息进入主题之后,Group-A和Group-B都会收到消息的副本,但是在两个组中都只会有一个实例对其进行消费。

如何搭建消息中间件应用框架之SpringCloud Stream

消息分区

通过引入消费组的概念,我们已经能够在多实例的情况下,保障每个消息只被组内一个实例进行消费。通过上面对消费组参数设置后的实验,我们可以观察到,消费组并无法控制消息具体被哪个实例消费。也就是说,对于同一条消息,它多次到达之后可能是由不同的实例进行消费的

。但是对于一些业务场景,就需要对于一些具有相同特征的消息每次都可以被同一个消费实例处理,比如:一些用于监控服务,为了统计某段时间内消息生产者发送的报告内容,监控服务需要在自身内容聚合这些数据,那么消息生产者可以为消息增加一个固有的特征ID来进行分区,使得拥有这些ID的消息每次都能被发送到一个特定的实例上实现累计统计的效果,否则这些数据就会分散到各个不同的节点导致监控结果不一致的情况。

分区概念的引入就是为了解决这样的问题:当生产者将消息数据发送给多个消费者实例时,保证拥有共同特征的消息数据始终是由同一个消费者实例接收和处理

Spring Cloud Stream为分区提供了通用的抽象实现,用来在消息中间件的上层实现分区

处理,所以它对于消息中间件自身是否实现了消息分区并不关心,这使得Spring Cloud Stream为不具备分区功能的消息中间件也增加了分区功能扩展

总结

今天我们介绍了SpringCloud Stream概念,说白了就是SCS就是在各个中间件对象基础上面抽象了一层接口,这样开发人员只要用SCS实现消息发布/订阅,不需要关心到底是用了哪个消息中间件。老顾下一篇就带着小伙伴们快速实战入门。


---End---

最近老顾上传了微服务网关的分享课程,请大家多多支持

1、

2、

3、

4、

5、

6、

7、

8、

9、

10、

11、

12、

13、

14、

15、

16、

17、

18、

19、

20、

21、

22、

23、

24、

25、

26、

27、

28、

29、

30、

31、

32、

33、

34、

35、

36、

37、

38、

39、

40、


分享到:


相關文章: