RabbitMQ消息可靠性传输解决方案

MQ / 2023-03-21
0 478

一、消息丢失

常见场景
● publisher到exchange过程中发生丢失
● exchange分发给queue发生丢失
● queue投递到customer发生丢失

1、【生产者发送丢失】publisher到exchange过程中发生丢失

原因

生产者发出消息后由于网络等原因服务端没收到

解决方案

1.1、开启RabbitMQ事务机制
生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消 息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit,类似我们数据库数据库事务机制。
1.2、开启confirm机制
在生产者端设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 ID,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息已经收到。如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。而且可以结合这个机制在自己业务里维护每个消息 ID 的状态,如果超过一定时间还没接收到这个消息的回调,那么可以业务主动重发。
事务机制和 confirm 机制优劣:
事务机制是同步的,提交一个事务之后会阻塞,吞吐量会下来,耗性能。
confirm 机制是异步的,流程不会阻塞,吞吐量较高,性能较好

2、exchange分发给queue发生丢失

原因

RabbitMq收到生产者的消息后还没有来得及持久化到磁盘,又或者创建队列没有持久化以及消息并没有设置为持久化,在Mq故障宕机后都会有消息丢失的情况。

解决方案

2.1、消息持久化
2.2、队列持久化

3、queue投递到customer发生丢失

原因

由于网络中断导致消费者没有收到消息;或者由于rabbitmq自动ack机制造成的消费端故障但是ack了。

解决方案

3.1、取消自动ack,改为消费端手动ack

总体方案:
1)、生产方发送消息时,要try...catch,在catch中捕获异常,并将MQ发送的关键内容记录到日志表中,日志表中要有消息发送状态,若发送失败,由定时任务定期扫描重发并更新状态;
2)、生产方publisher必须要加入确认回调机制,确认成功发送并签收的消息,如果进入失败回调方法,就修改数据库消息的状态,等待定时任务重发;
3)、消费方要开启手动签收ACK机制,消费成功才将消息移除,失败或因异常情况而尚未处理,就重新入队。
其实这就是前面阐述两个概念时已经讲过的内容,也是接近100%消息投递的企业级方案之一,主要目的就是为了解决消息丢失的问题。

二、消息阻塞

原因

1、消费者挂掉;2、消费者处理不过来

解决方案

1、扩展消费者;
2、将消息持久化,慢慢消费;

三、消息重复消费

原因

1、消费者由于网络波动导致消息ack失败,rabbitmq把消息重新放到了队列中;2、消息消费失败,retry机制导致消息重新发送

解决方案

1、消费者做好接口幂等处理
2、将消息唯一ID记录到消息日志表中,来消息了先判断
3、利用rabbitmq自带的getRedelivered()方法,这是用来判断是否是重复发送过来的
推荐第一种:做幂等。2需要一次db,3 第一次失败第二次成功会被拒绝掉

总结

1、三个问题中主要解决的是消息丢失。大部分公司遇不到消息积压的问题,而幂等性也不仅仅是rabbitmq才需要解决的,不使用rabbitmq照样需要做好接口幂等性,所以重复消费也不是大问题。
2、消息丢失最常见的解决方案就是定时任务补偿。不论是SOA架构还是微服务架构,都会有分布式任务调度存在,这就是rabbitmq最直接的补偿方式,
3、慎用生产者消息确认机制【confirm/return】,性能会下降。建立后台管理人工补偿。
4、一定手动签收消息
5、手动签收在finally中做,catch记录日志,定时补偿