一种基于消息入库的RabbitMQ全链路数据不丢失的方法及其系统与流程

未命名 08-07 阅读:58 评论:0

一种基于消息入库的rabbitmq全链路数据不丢失的方法及其系统
技术领域
1.本发明涉及计算机通讯技术领域,特别是一种基于消息入库的rabbitmq全链路数据不丢失的方法及其系统。


背景技术:

2.rabbitmq,它是一套开源(mpl)的消息队列服务软件,是实现了高级消息队列协议(amqp)的开源消息代理软件(亦称面向消息的中间件);该开源的消息队列服务在很多业务系统会使用到,是面向消息的中间件中较为常用的一种技术方案。现有技术中采用rabbitmq,消息从生产端到消费端消费要经过3个步骤,如图1所示,即:(1)生产端发送消息到rabbitmq;(2)rabbitmq发送消息到消费端;(3)消费端消费这条消息;这3个步骤中的每一步都有可能导致消息丢失,如果是重要的消息,比如支付回调,就需要尽量确保消息的可靠性和不丢失,这个就是本发明所解决的问题。
3.从生产端投递的消息丢失的原因有很多,比如消息在网络传输的过程中发生网络故障消息丢失,或者消息投递到rabbitmq时rabbitmq挂了。针对以上情况,rabbitmq本身提供了一些机制,比如事务消息机制,但是该机制会严重降低性能,所以一般不采用这种方法,而采用另一种轻量级的解决方案——confirm消息确认机制。然而确认机制也不是最终方案,因为rabbitmq收到消息还没来得及将消息持久化到硬盘时,rabbitmq挂了,这样消息还是会丢失;或者rabbitmq在发送确认消息给生产端的过程中,由于网络故障而导致生产端没有收到确认消息,这样生产端就不知道rabbitmq到底有没有收到消息,就不好做接下来的处理。所以除了rabbitmq提供的一些机制外,需要另外实现一套消息补偿机制,以应对一些极端情况。


技术实现要素:

4.为克服上述问题,本发明的目的是提供一种基于消息入库的rabbitmq全链路数据不丢失的方法,采用消息入库,将要发送的消息保存到数据库实现rabbitmq全链路数据不丢失。
5.本发明采用以下方案实现:一种基于消息入库的rabbitmq全链路数据不丢失的方法,其特征在于:所述方法包括生产端的消息入库处理机制,该消息入库处理机制具体包括如下步骤:
6.步骤s1、在发送消息前先将消息保存到消息数据库中,消息中初始化状态字段status=0,表示生产端将消息发送给了rabbitmq但还没收到确认;
7.步骤s2、生产端从消息数据库取得待推送给rabbitmq的消息后,在生产端消息确认监听后将status设为1,表示rabbitmq已收到消息;
8.步骤s3、在生产端设置一个定时器,定时检索消息表,将status=0并且超过固定时间后还没收到确认的消息取出重发;
9.步骤s4、rabbitmq发送收到的消息到消费端。
10.进一步的,所述步骤s3进一步具体为:消息取出重发会存在失败的情况,则设置最大重发次数,当超过最大重发次数就做异步处理,该异步处理具体为:判断重发次数是否超过设定的次数,是,则调用短信通道发送短信将消息发送给消费端,否,记录日志,进行继续重发消息。
11.进一步的,从消息数据库取得待推送给rabbitmq的消息,该消息的status标志默认状态是0,发送消息并确认成功后将该消息的status标志设置为1;通过异步的方式确保status标志为0的消息都能推送给消息队列rabbitmq,实现消息不丢失。
12.进一步的,还包括采用手动ack机制解决消费端消息丢失,该手动ack机制解决消费端消息丢失具体为:将自动ack机制改为手动ack机制,把ack机制中autoack参数置为false,对于rabbitmq服务端而言,队列中的消息分成了两个部分:一部分是等待投递给消费端的消息;另一部分是已经投递给消费端,但是还没有收到消费端确认信号的消息;如果rabbitmq一直没有收到消费端的确认信号,并且消费此消息的消费端已经断开连接或宕机,则rabbitmq会安排该消息重新进入队列,且将该消息放在队列头部,等待投递给下一个消费者。
13.本发明还提供了一种基于消息入库的rabbitmq全链路数据不丢失的系统,所述系统采用生产端的消息入库处理机制,该消息入库处理机制包括消息入库模块、监听确认模块、定时器设置模块、以及发送消息模块;
14.所述消息入库模块,用于在发送消息前先将消息保存到消息数据库中,消息中初始化状态字段status=0,表示生产端将消息发送给了rabbitmq但还没收到确认;
15.所述监听确认模块,用于生产端从消息数据库取得待推送给rabbitmq的消息后,在生产端消息确认监听后将status设为1,表示rabbitmq已收到消息;
16.所述定时器设置模块,用于在生产端设置一个定时器,定时检索消息表,将status=0并且超过固定时间后还没收到确认的消息取出重发;
17.所述发送消息模块,用于rabbitmq发送收到的消息到消费端。
18.进一步的,所述定时器设置模块的实现方式进一步具体为:消息取出重发会存在失败的情况,则设置最大重发次数,当超过最大重发次数就做异步处理,该异步处理具体为:判断重发次数是否超过设定的次数,是,则调用短信通道发送短信将消息发送给消费端,否,记录日志,进行继续重发消息。
19.进一步的,从消息数据库取得待推送给rabbitmq的消息,该消息的status标志默认状态是0,发送消息并确认成功后将该消息的status标志设置为1;通过异步的方式确保status标志为0的消息都能推送给消息队列rabbitmq,实现消息不丢失。
20.进一步的,所述系统还采用手动ack机制解决消费端消息丢失,该手动ack机制解决消费端消息丢失具体为:将自动ack机制改为手动ack机制,把ack机制中autoack参数置为false,对于rabbitmq服务端而言,队列中的消息分成了两个部分:一部分是等待投递给消费端的消息;另一部分是已经投递给消费端,但是还没有收到消费端确认信号的消息;如果rabbitmq一直没有收到消费端的确认信号,并且消费此消息的消费端已经断开连接或宕机,则rabbitmq会安排该消息重新进入队列,且将该消息放在队列头部,等待投递给下一个消费者。
21.本发明的有益效果在于:本发明采用了消息入库和“异步补偿”的机制,如添加定时器和异步处理,以解决可能出现网络故障而导致生产端没有收到确认消息的问题;同时通过调整ack机制解决消费端消息丢失问题,确保从生产端到rabbitmq再到消费端的全链路数据的不丢失;并且具备可靠、简单、灵活的特点。
附图说明
22.图1是现有技术中消息从生产端到消费端消费的示意图。
23.图2是本发明的方法流程示意图。
24.图3是本发明一实施例的消息入库机制示意图。
25.图4是本发明一实施例的异步处理的示意图。
26.图5是本发明一实施例的手动ack机制的示意图。
27.图6是本发明的系统原理框图。
具体实施方式
28.下面结合附图对本发明做进一步说明。
29.本发明一种基于消息入库的rabbitmq全链路数据不丢失的方法,设计思路主要是采用消息入库和“异步补偿”的机制,如添加定时器和异步处理模块,以解决可能出现网络故障而导致生产端没有收到确认消息的问题;同时通过调整ack机制解决消费端消息丢失问题,确保从生产端到rabbitmq再到消费端的全链路数据的不丢失。针对极端情况,如网络异常或宕机,通过异步补偿的设计思路解决,达成一定程度上消息数据不丢失的目标。
30.如图2所示,本发明的一种基于消息入库的rabbitmq全链路数据不丢失的方法,其特征在于:所述方法包括生产端的消息入库处理机制,该消息入库处理机制具体包括如下步骤:
31.步骤s1、在发送消息前先将消息保存到消息数据库中,消息中初始化状态字段status=0,表示生产端将消息发送给了rabbitmq但还没收到确认;
32.步骤s2、生产端从消息数据库取得待推送给rabbitmq的消息后,在生产端消息确认监听后将status设为1,表示rabbitmq已收到消息;
33.步骤s3、在生产端设置一个定时器,定时检索消息表,将status=0并且超过固定时间后还没收到确认的消息取出重发;
34.步骤s4、rabbitmq发送收到的消息到消费端。
35.下面结合一具体实施例对本发明做进一步说明:
36.如图3所示,本发明的一种基于消息入库的rabbitmq全链路数据不丢失的方法,实现分2个阶段:生产端和消费端,比较核心的实现是解决生产端可靠性投递,确保不丢失消息,采用的做法是消息入库,具体细节如下:
37.(1)在发送消息前先将消息保存到消息数据库中,初始化状态字段status=0,表示生产端将消息发送给了rabbitmq但还没收到确认;
38.(2)生产端从消息数据库取得待推送给rabbitmq的消息后,在生产端收到确认后将status设为1,表示rabbitmq已收到消息。
39.(3)因为有可能出现网络故障而导致生产端没有收到确认消息的情况,所以生产
端这边开一个定时器,定时检索消息表,将status=0并且超过固定时间后还没收到确认的消息取出重发(第二种情况下这里会造成消息重复,消费者端要做幂等性),可能重发还会失败,所以可以设计个最大重发次数,超过就做另外的处理(走其他异步处理)。即消息取出重发会存在失败的情况,则设置最大重发次数,如次数为10次,当超过最大重发次数就做异步处理,如图4所示,该异步处理具体为:判断重发次数是否超过设定的次数,是,则调用短信通道发送短信将消息发送给消费端,否,记录日志,进行继续重发消息。
40.(4)rabbitmq发送收到的消息到消费端;
41.(5)消费端对消息进行消费处理。
42.本发明从消息数据库取得待推送给rabbitmq的消息,该消息的status标志默认状态是0,发送消息并确认成功后将该消息的status标志设置为1;通过异步的方式确保status标志为0的消息都能推送给消息队列rabbitmq,实现消息不丢失。
43.如上步骤,能实现消息就可靠地投递到rabbitmq中,而生产端也可以感知到。
44.消费端的保障,技术方案是采用手动ack机制解决消费端消息丢失问题,方法说明如下:
45.rabbitmq的自动ack机制,即默认rabbitmq在消息发出后就立即将这条消息删除,而不管消费端是否接收到,是否处理完,导致消费端消息丢失时rabbitmq自己又没有这条消息。
46.如图5所示,本发明采用将自动ack机制改为手动ack机制,把autoack参数置为false,对于rabbitmq服务端而言,队列中的消息分成了两个部分:一部分是等待投递给消费端的消息;一部分是已经投递给消费端,但是还没有收到消费端确认信号的消息。如果rabbitmq一直没有收到消费端的确认信号,并且消费此消息的消费端已经断开连接或宕机(rabbitmq会自己感知到),则rabbitmq会安排该消息重新进入队列(放在队列头部),等待投递给下一个消费者,当然也有可能还是原来的那个消费端,消费端需要确保幂等性。
47.如图6所示,本发明还提供了一种基于消息入库的rabbitmq全链路数据不丢失的系统,所述系统采用生产端的消息入库处理机制,该消息入库处理机制包括消息入库模块、监听确认模块、定时器设置模块、以及发送消息模块;
48.所述消息入库模块,用于在发送消息前先将消息保存到消息数据库中,消息中初始化状态字段status=0,表示生产端将消息发送给了rabbitmq但还没收到确认;
49.所述监听确认模块,用于生产端从消息数据库取得待推送给rabbitmq的消息后,在生产端消息确认监听后将status设为1,表示rabbitmq已收到消息;
50.所述定时器设置模块,用于在生产端设置一个定时器,定时检索消息表,将status=0并且超过固定时间后还没收到确认的消息取出重发(第二种情况下这里会造成消息重复,消费者端要做幂等性);所述定时器设置模块的实现方式进一步具体为:消息取出重发会存在失败的情况,则设置最大重发次数,当超过最大重发次数就做异步处理,该异步处理具体为:判断重发次数是否超过设定的次数(如次数设置为8次),是,则调用短信通道发送短信将消息发送给消费端,否,记录日志,进行继续重发消息。
51.所述发送消息模块,用于rabbitmq发送收到的消息到消费端。
52.在本发明中,从消息数据库取得待推送给rabbitmq的消息,该消息的status标志默认状态是0,发送消息并确认成功后将该消息的status标志设置为1;通过异步的方式确
保status标志为0的消息都能推送给消息队列rabbitmq,实现消息不丢失。
53.所述系统还采用手动ack机制解决消费端消息丢失,该手动ack机制解决消费端消息丢失具体为:将自动ack机制改为手动ack机制,把ack机制中autoack参数置为false,对于rabbitmq服务端而言,队列中的消息分成了两个部分:一部分是等待投递给消费端的消息;另一部分是已经投递给消费端,但是还没有收到消费端确认信号的消息;如果rabbitmq一直没有收到消费端的确认信号,并且消费此消息的消费端已经断开连接或宕机,则rabbitmq会安排该消息重新进入队列(rabbitmq会自己感知到),且将该消息放在队列头部,等待投递给下一个消费者。
54.总之,本发明采用了消息入库和“异步补偿”的机制,如添加定时器和异步处理,以解决可能出现网络故障而导致生产端没有收到确认消息的问题;同时通过调整ack机制解决消费端消息丢失问题,确保从生产端到rabbitmq再到消费端的全链路数据的不丢失;并且具备可靠、简单、灵活的特点。
55.以上所述仅为本发明的较佳实施例,凡依本发明申请专利范围所做的均等变化与修饰,皆应属本发明的涵盖范围。

技术特征:
1.一种基于消息入库的rabbitmq全链路数据不丢失的方法,其特征在于:所述方法包括生产端的消息入库处理机制,该消息入库处理机制具体包括如下步骤:步骤s1、在发送消息前先将消息保存到消息数据库中,消息中初始化状态字段status=0,表示生产端将消息发送给了rabbitmq但还没收到确认;步骤s2、生产端从消息数据库取得待推送给rabbitmq的消息后,在生产端消息确认监听后将status设为1,表示rabbitmq已收到消息;步骤s3、在生产端设置一个定时器,定时检索消息表,将status=0并且超过固定时间后还没收到确认的消息取出重发;步骤s4、rabbitmq发送收到的消息到消费端。2.根据权利要求1所述的一种基于消息入库的rabbitmq全链路数据不丢失的方法,其特征在于:所述步骤s3进一步具体为:消息取出重发会存在失败的情况,则设置最大重发次数,当超过最大重发次数就做异步处理,该异步处理具体为:判断重发次数是否超过设定的次数,是,则调用短信通道发送短信将消息发送给消费端,否,记录日志,进行继续重发消息。3.根据权利要求1所述的一种基于消息入库的rabbitmq全链路数据不丢失的方法,其特征在于:从消息数据库取得待推送给rabbitmq的消息,该消息的status标志默认状态是0,发送消息并确认成功后将该消息的status标志设置为1;通过异步的方式确保status标志为0的消息都能推送给消息队列rabbitmq,实现消息不丢失。4.根据权利要求1所述的一种基于消息入库的rabbitmq全链路数据不丢失的方法,其特征在于:还包括采用手动ack机制解决消费端消息丢失,该手动ack机制解决消费端消息丢失具体为:将自动ack机制改为手动ack机制,把ack机制中autoack参数置为false,对于rabbitmq服务端而言,队列中的消息分成了两个部分:一部分是等待投递给消费端的消息;另一部分是已经投递给消费端,但是还没有收到消费端确认信号的消息;如果rabbitmq一直没有收到消费端的确认信号,并且消费此消息的消费端已经断开连接或宕机,则rabbitmq会安排该消息重新进入队列,且将该消息放在队列头部,等待投递给下一个消费者。5.一种基于消息入库的rabbitmq全链路数据不丢失的系统,其特征在于:所述系统采用生产端的消息入库处理机制,该消息入库处理机制包括消息入库模块、监听确认模块、定时器设置模块、以及发送消息模块;所述消息入库模块,用于在发送消息前先将消息保存到消息数据库中,消息中初始化状态字段status=0,表示生产端将消息发送给了rabbitmq但还没收到确认;所述监听确认模块,用于生产端从消息数据库取得待推送给rabbitmq的消息后,在生产端消息确认监听后将status设为1,表示rabbitmq已收到消息;所述定时器设置模块,用于在生产端设置一个定时器,定时检索消息表,将status=0并且超过固定时间后还没收到确认的消息取出重发;所述发送消息模块,用于rabbitmq发送收到的消息到消费端。6.根据权利要求5所述的一种基于消息入库的rabbitmq全链路数据不丢失的系统,其特征在于:所述定时器设置模块的实现方式进一步具体为:消息取出重发会存在失败的情况,则设置最大重发次数,当超过最大重发次数就做异步处理,该异步处理具体为:判断重
发次数是否超过设定的次数,是,则调用短信通道发送短信将消息发送给消费端,否,记录日志,进行继续重发消息。7.根据权利要求5所述的一种基于消息入库的rabbitmq全链路数据不丢失的系统,其特征在于:从消息数据库取得待推送给rabbitmq的消息,该消息的status标志默认状态是0,发送消息并确认成功后将该消息的status标志设置为1;通过异步的方式确保status标志为0的消息都能推送给消息队列rabbitmq,实现消息不丢失。8.根据权利要求5所述的一种基于消息入库的rabbitmq全链路数据不丢失的系统,其特征在于:所述系统还采用手动ack机制解决消费端消息丢失,该手动ack机制解决消费端消息丢失具体为:将自动ack机制改为手动ack机制,把ack机制中autoack参数置为false,对于rabbitmq服务端而言,队列中的消息分成了两个部分:一部分是等待投递给消费端的消息;另一部分是已经投递给消费端,但是还没有收到消费端确认信号的消息;如果rabbitmq一直没有收到消费端的确认信号,并且消费此消息的消费端已经断开连接或宕机,则rabbitmq会安排该消息重新进入队列,且将该消息放在队列头部,等待投递给下一个消费者。

技术总结
本发明提供了一种基于消息入库的RabbitMQ全链路数据不丢失的方法,设计思路主要是采用消息入库和“异步补偿”的机制,如添加定时器和异步处理模块,以解决可能出现网络故障而导致生产端没有收到确认消息的问题;同时通过调整ack机制解决消费端消息丢失问题,确保从生产端到RabbitMQ再到消费端的全链路数据的不丢失。针对极端情况,如网络异常或宕机,通过异步补偿的设计思路解决,达成一定程度上消息数据不丢失的目标。消息数据不丢失的目标。消息数据不丢失的目标。


技术研发人员:刘德建 李佳 郑彬
受保护的技术使用者:福建天晴数码有限公司
技术研发日:2022.01.26
技术公布日:2023/8/5
版权声明

本文仅代表作者观点,不代表航家之家立场。
本文系作者授权航家号发表,未经原创作者书面授权,任何单位或个人不得引用、复制、转载、摘编、链接或以其他任何方式复制发表。任何单位或个人在获得书面授权使用航空之家内容时,须注明作者及来源 “航空之家”。如非法使用航空之家的部分或全部内容的,航空之家将依法追究其法律责任。(航空之家官方QQ:2926969996)

航空之家 https://www.aerohome.com.cn/

飞机超市 https://mall.aerohome.com.cn/

航空资讯 https://news.aerohome.com.cn/

分享:

扫一扫在手机阅读、分享本文

相关推荐