12.08 中间件RabbitMQ知识(干货)

基本原理:

MQ全称为Message Queue,是一种分布式应用程序的的通信方法,它是消费-生产者模型的一个典型的代表,producer往消息队列中不断写入消息,而另一端consumer则可以读取或者订阅队列中的消息。

RabbitMQ是MQ产品的典型代表,是一款基于AMQP协议可复用的企业消息系统。业务上,可以实现服务提供者和消费者之间的数据解耦,提供高可用性的消息传输机制,在实际生产中应用相当广泛。本文意在介绍Rabbitmq的基本原理,包括rabbitmq基本框架,概念,通信过程等。


中间件RabbitMQ知识(干货)



为什么选择RabbitMQ?

  • 除了Qpid,RabbitMQ是唯一一个实现了AMQP标准的消息服务器;
  • 可靠性,RabbitMQ的持久化支持,保证了消息的稳定性;
  • 高并发,RabbitMQ使用了Erlang开发语言,Erlang是为电话交换机开发的语言,天生自带高并发光环,和高可用特性;
  • 集群部署简单,正是应为Erlang使得RabbitMQ集群部署变的超级简单;
  • 社区活跃度高,根据网上资料来看,RabbitMQ也是首选;


  • 中间件RabbitMQ知识(干货)

    ConnectionFactory(连接管理器):应用程序与Rabbit之间建立连接的管理器,程序代码中使用;

    Channel(信道):消息推送使用的通道;

    Exchange(交换器):用于接受、分配消息;

    三种常见的交换器:direct(发布与订阅,完全匹配)、fanout(广播)、topic(主题,规则匹配)

    Queue(队列):用于存储生产者的消息;

    RoutingKey(路由键):用于把生成者的数据分配到交换器上;


    RabbitMQ使用场景:

    1.异步处理
    场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种 1.串行的方式;2.并行方式


    中间件RabbitMQ知识(干货)


    a、串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。

    中间件RabbitMQ知识(干货)


    b、并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的

    中间件RabbitMQ知识(干货)

    用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍。

    中间件RabbitMQ知识(干货)

    2.应用解耦


    中间件RabbitMQ知识(干货)

    订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功

    库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作

    假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦

    3.流量削锋


    中间件RabbitMQ知识(干货)

    用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。
    秒杀业务根据消息队列中的请求信息,再做后续处理


    RabbitMQ消息持久化:

    RabbitMQ队列和交换器有一个不可告人的秘密,就是默认情况下重启服务器会导致消息丢失,那么怎么保证RabbitMQ在重启的时候不丢失呢?

    答案就是消息持久化。

    当你把消息发送到Rabbit服务器的时候,你需要选择你是否要进行持久化,但这并不能保证Rabbit能从崩溃中恢复,想要Rabbit消息能恢复必须满足3个条件:

    1. 投递消息的时候durable设置为true,消息持久化,代码:channel.queueDeclare(x, true, false, false, null),参数2设置为true持久化;
    2. 设置投递模式deliveryMode设置为2(持久),代码:channel.basicPublish(x, x, MessageProperties.PERSISTENT_TEXT_PLAIN,x),参数3设置为存储纯文本到磁盘;
    3. 消息已经到达持久化交换器上;
    4. 消息已经到达持久化的队列;

    持久化工作原理

    Rabbit会将你的持久化消息写入磁盘上的持久化日志文件,等消息被消费之后,Rabbit会把这条消息标识为等待垃圾回收。

    持久化的缺点

    消息持久化的优点显而易见,但缺点也很明显,那就是性能,因为要写入硬盘要比写入内存性能较低很多,从而降低了服务器的吞吐量,尽管使用SSD硬盘可以使事情得到缓解,但他仍然吸干了RabbitMQ的性能,当消息成千上万条要写入磁盘的时候,性能是很低的。



    消息 从生产者发送到交换机

    中间网络断开怎么办?

    设置信道channel 为事务模式

    通过channel.txSelect 开启事务,channel.txCommit 提交事务,channel.txRollback 用于事务回滚

    如果在还没有提交事务之前,RabbitMQ抛出异常,我们可以 将其捕获,然后进行事务回滚。缺点是 事务模式会极大的消耗RabbitMQ的性能。

    消息在什么时候确认?

    broker 将在下面的情况中对消息进行 confirm :

    broker 发现当前消息无法被路由到指定的 queues 中(如果设置了 mandatory 属性,则 broker 会先发送 basic.return)

    非持久属性的消息到达了其所应该到达的所有 queue 中(和镜像 queue 中)

    持久消息到达了其所应该到达的所有 queue 中(和镜像 queue 中),并被持久化到了磁盘(被 fsync)

    持久消息从其所在的所有 queue 中被 consume 了(如果必要则会被 acknowledge)


    为什么不通过TCP直接发送命令?

    对于操作系统来说创建和销毁TCP会话是非常昂贵的开销,假设高峰期每秒有成千上万条连接,每个连接都要创建一条TCP会话,这就造成了TCP连接的巨大浪费,而且操作系统每秒能创建的TCP也是有限的,因此很快就会遇到系统瓶颈。




    如何保证消息不被重复消费?

    保证消息不被重复消费的关键是保证消息队列的幂等性,这个问题针对业务场景来答分以下几点:

    1.比如,你拿到这个消息做数据库的insert操作。那就容易了,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。

    2.再比如,你拿到这个消息做redis的set的操作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。

    3.如果上面两种情况还不行,上大招。准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。




    如何解决丢数据的问题?

    1.生产者丢数据

    生产者的消息没有投递到MQ中怎么办?从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。

    transaction机制就是说,发送消息前,开启事物(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事物就会回滚(channel.txRollback()),如果发送成功则提交事物(channel.txCommit())。

    然而缺点就是吞吐量下降了。因此,按照博主的经验,生产上用confirm模式的居多。一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了.如果rabiitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。

    2.消息队列丢数据

    处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。

    在消息还没有持久化到硬盘时,可能服务已经死掉,这种情况可以通过引入mirrored-queue即镜像队列,但也不能保证消息百分百不丢失

    3.消费者丢数据

    启用手动确认模式可以解决这个问题

    ①自动确认模式,消费者挂掉,待ack的消息回归到队列中。消费者抛出异常,消息会不断的被重发,直到处理成功。不会丢失消息,即便服务挂掉,没有处理完成的消息会重回队列,但是异常会让消息不断重试。

    ②手动确认模式,如果消费者来不及处理就死掉时,没有响应ack时会重复发送一条信息给其他消费者;如果监听程序处理异常了,且未对异常进行捕获,会一直重复接收消息,然后一直抛异常;如果对异常进行了捕获,但是没有在finally里ack,也会一直重复发送消息(重试机制)。

    ③不确认模式,acknowledge="none" 不使用确认机制,只要消息发送完成会立即在队列移除,无论客户端异常还是断开,只要发送完就移除,不会重发。


    RabbitMQ集群模式分类

    1.普通模式:

    默认的集群模式,假设集群上有两个节点 node1,node2, 消息实体只存在一个节点,如果生产者生产一个消息,丢向node1,但是消费者在node2进行消费,那么就需要node2将node1中的消息取出,并且发送给消费者。

    2. 镜像模式:

    需要将消费者的队列变成镜像队列,存在于多个节点。实现RabbitMQ的高可用,作用就是,消息实体会在队列之间同步。


    生产者想知道消息是否到达了 RabbitMQ 服务器上。这个时候我们应该如何处理

    针对上述问题,RabbitMQ 提供了2中解决方案。

    1.通过事务机制实现(不推荐

    事务机制到发送一条消息之后会使发送端阻塞,在RabbitMQ服务端回应之后,才能继续发送下一条消息,在高并发的情况下可能性能不怎么好,因此不建议使用。

    2.通过发送方确认(publisher confirm)机制实现(推荐

    生产者将channel设置成confirm 模式后,在此channel上发布的消息都会被分派一个唯一的ID(从1开始),当消息不exchange路由到所匹配的队列后,如果消息是持久化的,那么在消息持久化后,RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这样生产者就知道消息正确到达RabbitMQ了

    • 注意:

    1、上方2中方式不可共存,只能选取一种,如果一个channel上开启事务又开启confirm则会报错。

    2、如果发送消息的交换器没有匹配的队列,那么消息也会丢失。



    以上就是rabbitmq的一些知识

    记录学习,每天进步一点点的橘子大王。

    喜欢就关注我吧。


    中间件RabbitMQ知识(干货)


    分享到:


    相關文章: