基本原理:
MQ全称为Message Queue,是一种分布式应用程序的的通信方法,它是消费-生产者模型的一个典型的代表,producer往消息队列中不断写入消息,而另一端consumer则可以读取或者订阅队列中的消息。
RabbitMQ是MQ产品的典型代表,是一款基于AMQP协议可复用的企业消息系统。业务上,可以实现服务提供者和消费者之间的数据解耦,提供高可用性的消息传输机制,在实际生产中应用相当广泛。本文意在介绍Rabbitmq的基本原理,包括rabbitmq基本框架,概念,通信过程等。
为什么选择RabbitMQ?
ConnectionFactory(连接管理器):应用程序与Rabbit之间建立连接的管理器,程序代码中使用;
Channel(信道):消息推送使用的通道;
Exchange(交换器):用于接受、分配消息;
三种常见的交换器:direct(发布与订阅,完全匹配)、fanout(广播)、topic(主题,规则匹配)
Queue(队列):用于存储生产者的消息;
RoutingKey(路由键):用于把生成者的数据分配到交换器上;
RabbitMQ使用场景:
1.异步处理
场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种 1.串行的方式;2.并行方式
a、串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。
b、并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的
用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍。
2.应用解耦
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作
假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦
3.流量削锋
用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。
秒杀业务根据消息队列中的请求信息,再做后续处理
RabbitMQ消息持久化:
RabbitMQ队列和交换器有一个不可告人的秘密,就是默认情况下重启服务器会导致消息丢失,那么怎么保证RabbitMQ在重启的时候不丢失呢?
答案就是消息持久化。
当你把消息发送到Rabbit服务器的时候,你需要选择你是否要进行持久化,但这并不能保证Rabbit能从崩溃中恢复,想要Rabbit消息能恢复必须满足3个条件:
- 投递消息的时候durable设置为true,消息持久化,代码:channel.queueDeclare(x, true, false, false, null),参数2设置为true持久化;
- 设置投递模式deliveryMode设置为2(持久),代码:channel.basicPublish(x, x, MessageProperties.PERSISTENT_TEXT_PLAIN,x),参数3设置为存储纯文本到磁盘;
- 消息已经到达持久化交换器上;
- 消息已经到达持久化的队列;
持久化工作原理
Rabbit会将你的持久化消息写入磁盘上的持久化日志文件,等消息被消费之后,Rabbit会把这条消息标识为等待垃圾回收。
持久化的缺点
消息持久化的优点显而易见,但缺点也很明显,那就是性能,因为要写入硬盘要比写入内存性能较低很多,从而降低了服务器的吞吐量,尽管使用SSD硬盘可以使事情得到缓解,但他仍然吸干了RabbitMQ的性能,当消息成千上万条要写入磁盘的时候,性能是很低的。
消息 从生产者发送到交换机
中间网络断开怎么办?
设置信道channel 为事务模式
通过channel.txSelect 开启事务,channel.txCommit 提交事务,channel.txRollback 用于事务回滚
如果在还没有提交事务之前,RabbitMQ抛出异常,我们可以 将其捕获,然后进行事务回滚。缺点是 事务模式会极大的消耗RabbitMQ的性能。
消息在什么时候确认?
broker 将在下面的情况中对消息进行 confirm :
broker 发现当前消息无法被路由到指定的 queues 中(如果设置了 mandatory 属性,则 broker 会先发送 basic.return)
非持久属性的消息到达了其所应该到达的所有 queue 中(和镜像 queue 中)
持久消息到达了其所应该到达的所有 queue 中(和镜像 queue 中),并被持久化到了磁盘(被 fsync)
持久消息从其所在的所有 queue 中被 consume 了(如果必要则会被 acknowledge)
为什么不通过TCP直接发送命令?
对于操作系统来说创建和销毁TCP会话是非常昂贵的开销,假设高峰期每秒有成千上万条连接,每个连接都要创建一条TCP会话,这就造成了TCP连接的巨大浪费,而且操作系统每秒能创建的TCP也是有限的,因此很快就会遇到系统瓶颈。
如何保证消息不被重复消费?
保证消息不被重复消费的关键是保证消息队列的幂等性,这个问题针对业务场景来答分以下几点:
1.比如,你拿到这个消息做数据库的insert操作。那就容易了,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
2.再比如,你拿到这个消息做redis的set的操作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。
3.如果上面两种情况还不行,上大招。准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将
如何解决丢数据的问题?
1.生产者丢数据
生产者的消息没有投递到MQ中怎么办?从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。
transaction机制就是说,发送消息前,开启事物(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事物就会回滚(channel.txRollback()),如果发送成功则提交事物(channel.txCommit())。
然而缺点就是吞吐量下降了。因此,按照博主的经验,生产上用confirm模式的居多。一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了.如果rabiitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。
2.消息队列丢数据
处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。
在消息还没有持久化到硬盘时,可能服务已经死掉,这种情况可以通过引入mirrored-queue即镜像队列,但也不能保证消息百分百不丢失
3.消费者丢数据
启用手动确认模式可以解决这个问题
①自动确认模式,消费者挂掉,待ack的消息回归到队列中。消费者抛出异常,消息会不断的被重发,直到处理成功。不会丢失消息,即便服务挂掉,没有处理完成的消息会重回队列,但是异常会让消息不断重试。
②手动确认模式,如果消费者来不及处理就死掉时,没有响应ack时会重复发送一条信息给其他消费者;如果监听程序处理异常了,且未对异常进行捕获,会一直重复接收消息,然后一直抛异常;如果对异常进行了捕获,但是没有在finally里ack,也会一直重复发送消息(重试机制)。
③不确认模式,acknowledge="none" 不使用确认机制,只要消息发送完成会立即在队列移除,无论客户端异常还是断开,只要发送完就移除,不会重发。
RabbitMQ集群模式分类
1.普通模式:
默认的集群模式,假设集群上有两个节点 node1,node2, 消息实体只存在一个节点,如果生产者生产一个消息,丢向node1,但是消费者在node2进行消费,那么就需要node2将node1中的消息取出,并且发送给消费者。
2. 镜像模式:
需要将消费者的队列变成镜像队列,存在于多个节点。实现RabbitMQ的高可用,作用就是,消息实体会在队列之间同步。
生产者想知道消息是否到达了 RabbitMQ 服务器上。这个时候我们应该如何处理?
针对上述问题,RabbitMQ 提供了2中解决方案。
1.通过事务机制实现(不推荐)
事务机制到发送一条消息之后会使发送端阻塞,在RabbitMQ服务端回应之后,才能继续发送下一条消息,在高并发的情况下可能性能不怎么好,因此不建议使用。
2.通过发送方确认(publisher confirm)机制实现(推荐)
生产者将channel设置成confirm 模式后,在此channel上发布的消息都会被分派一个唯一的ID(从1开始),当消息不exchange路由到所匹配的队列后,如果消息是持久化的,那么在消息持久化后,RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这样生产者就知道消息正确到达RabbitMQ了
- 注意:
1、上方2中方式不可共存,只能选取一种,如果一个channel上开启事务又开启confirm则会报错。
2、如果发送消息的交换器没有匹配的队列,那么消息也会丢失。
以上就是rabbitmq的一些知识
记录学习,每天进步一点点的橘子大王。
喜欢就关注我吧。
閱讀更多 是橘子大王o 的文章