一种基于MQTT和数据库的消息传输方法及系统与流程

未命名 09-10 阅读:51 评论:0

一种基于mqtt和数据库的消息传输方法及系统
技术领域
1.本发明涉及数据通讯技术领域,具体地说是一种基于mqtt和数据库的消息传输方法及系统。


背景技术:

2.得益于云计算技术的发展和普及,企业服务越来越倾向部署在云端。同时,随着企业服务的拓展,单点服务器逐渐不能满足需求,分布式系统成为首选。对于分布式系统,尤其是涉及到边缘计算时,边缘节点和中心节点间的通信成为新的问题。
3.mqtt(message queuing telemetry transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的轻量级协议。该协议构建于tcp/ip协议之上,mqtt最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛应用。
4.mqtt设计了一套保证消息稳定传输的机制,包括消息应答、存储和重传。
5.在这套机制下,提供了三种不同层次qos(quality of service):
6.qos0,at most once,至多一次;
7.qos1,at least once,至少一次;
8.qos2,exactly once,确保只有一次。
9.qos是消息的发送方(sender)和接受方(receiver)之间达成的一个协议:
10.qos0代表,sender发送的一条消息,receiver最多能收到一次。sender尽力向receiver发送消息,如果发送失败,不做处理;
11.qos1代表,sender发送的一条消息,receiver至少能收到一次。sender向receiver发送消息,如果发送失败,会继续重试,直到receiver收到消息为止,但是因为重传的原因,receiver有可能会收到重复的消息;
12.qos2代表,sender发送的一条消息,receiver确保能收到而且只收到一次。sender尽力向receiver发送消息,如果发送失败,会继续重试,直到receiver收到消息为止,同时保证receiver不会因为消息重传而收到重复的消息。
13.qos2的重传和应答机制复杂,同时开销也是最大的。在吞吐量很大的应用场景,例如多种传感器设备实时传输数据的时候,qos2的额外开销影响过大,qos0发送消息的开销小,速度快,更适合这种场景。
14.对于qos0,需要解决消息重复和消息丢失的问题,尤其面对分布式系统的时候,多节点间传输大量信息,一个可以进行消息去重、解决消息冲突、处理消息丢失的系统和方法尤为重要。


技术实现要素:

15.本发明的技术任务是针对以上不足之处,提供一种基于mqtt和数据库的消息传输
方法及系统,能够解决分布式系统中使用qos0机制mqtt所造成的消息重复、消息冲突、消息丢失问题;简化分布式节点间的连接并保证信息的实时可靠。
16.本发明解决其技术问题所采用的技术方案是:
17.一种基于mqtt和数据库的消息传输方法,该方法的实现包括mqtt单元,redis,业务处理单元和关系型数据库;
18.所述mqtt单元包括配置在各节点的mqtt客户端和配置在云端服务器的mqtt服务端,所述mqtt客户端用于各节点发布请求/反馈信息和接收请求/反馈信息,所述mqtt服务端用于转发各节点上mqtt客户端发布的信息;
19.所述redis配置在各节点上,用于对相关信息进行快速缓存,包括存储节点权限、存储请求状态、存储接收的信息与存储反馈信息;
20.所述业务处理单元配置在各节点上,用于对请求信息或反馈信息进行处理、对关系型数据库进行操作;
21.所述关系型数据库配置在各节点上,用于存储该节点的数据信息、请求处理信息,对数据和处理结果进行持久化,便于后期回溯问题、解决问题、总结问题。
22.本方法中,mqtt单元包括用于消息传输的mqtt客户端和mqtt服务端,redis用于存储节点权限、存储请求状态和存储接收的信息,业务处理单元用于执行对应的处理逻辑,关系型数据库用于存储该节点的数据信息、请求处理信息;一次完整的消息传输过程包括请求的发起与记录,请求的接收、记录与处理,请求信息的反馈与处理。通过该方法,弥补现有技术的缺陷,解决分布式系统中使用qos0机制mqtt所造成的消息重复、消息冲突、消息丢失问题,简化分布式节点间的连接并保证信息的实时可靠。
23.优选的,所述redis存储节点权限,包括每个节点能发布、订阅的主题。
24.优选的,所述redis存储节点权限,当中心节点有多个且互相同步,为防止多个中心节点同时接收到反馈信息进行处理,导致后续出现冲突问题,不同的中心节点将负责固定的一个或多个请求主题,在节点权限中记录。
25.进一步的,该方法的具体实现方式如下:
26.步骤1:将请求和mqtt主题进行关联,对应不同请求,约定对应的mqtt主题、mqtt反馈主题、主题标识符,部分解决消息冲突的问题;redis中存储各节点权限,各节点根据权限订阅对应主题;
27.步骤2:两个节点间进行数据传输交互,一次数据传输交互包括请求的发起与记录,请求的接收、记录与处理,请求信息的反馈与处理;消息数据通过mqtt模块进行传递,经过redis进行缓存、通过业务处理模块进行逻辑处理、通过关系型数据库进行持久化,然后将信息反馈给请求的发起方,进行相应的逻辑处理并将该次交互相关信息通过关系型数据库持久化;在这一步中,解决消息冲突与消息重复的问题;
28.步骤3:在特定情况下进行消息重传,解决消息丢失的问题,请求发起方在发出请求信息后,经过等待时间t未收到反馈信息,则重发请求信息;在重发次数达到最大重发次数n后,说明存在网络、硬件问题,该系统中不进行相对的具体处理;此时不进行重发,将该次失败的请求信息存入数据库中,便于其他途径解决问题时进行问题的回溯和原因的排查。这里的等待时间t和最大重发次数n根据具体现实情况进行设定和调整。
29.进一步的,所述步骤2具体包括以下四部分:
30.发起请求:需要发起请求的节点即请求发起方的业务处理模块整理报文有效荷载信息,包括主题标识符和时间戳组成的唯一标识符、请求详细内容,以对应的mqtt主题发布请求;请求发起方的redis中添加关于这次请求信息和请求状态的记录;
31.接收请求:接收请求的节点即请求接收方的mqtt客户端通过订阅的对应mqtt主题接收到请求信息,比对redis中是否已经有该唯一标识符对应的记录;若已经有对应记录了,不存储这条请求信息;若无,将信息存入redis中;
32.处理请求:请求接收方的业务处理单元从redis中获取接收信息,解析出具体请求内容;根据请求内容,业务处理单元执行相应功能逻辑;处理完成后,整理为反馈信息,包括请求所需返回内容、请求执行情况、唯一标识符;请求接收方的mqtt客户端以对应的mqtt反馈主题发布反馈;同时设置redis中这条信息记录的有效时间为等待时间t,经过t时间后该记录失效;
33.接收反馈:请求发起方的mqtt客户端通过对应反馈主题获取反馈信息,将信息存入redis中;redis中根据请求类型对接收到的信息根据指令分类,形成执行队列,业务处理单元从redis中获取信息,执行相应功能逻辑,并将这次请求信息和结果存入关系型数据库;业务完成后,改变redis中该请求记录中的状态,不再接收对应的反信息,并设置该条请求记录的有效时间为等待时间t,经过时间t后该记录失效。
34.本发明还要求保护一种基于mqtt和数据库的消息传输系统,包括mqtt单元,redis,业务处理单元和关系型数据库;
35.所述mqtt单元包括配置在各节点的mqtt客户端和配置在云端服务器的mqtt服务端,所述mqtt客户端用于各节点发布请求/反馈信息和接收请求/反馈信息,所述mqtt服务端用于转发各节点上mqtt客户端发布的信息;
36.所述redis配置在各节点上,用于存储节点权限、存储请求状态、存储接收的信息与存储反馈信息;
37.所述业务处理单元配置在各节点上,用于对请求信息或反馈信息进行处理、对关系型数据库进行操作;
38.所述关系型数据库配置在各节点上,用于存储该节点的数据信息、请求处理信息。
39.优选的,所述redis存储节点权限,包括每个节点能发布、订阅的主题。
40.优选的,所述redis存储节点权限,当中心节点有多个且互相同步,为防止多个中心节点同时接收到反馈信息进行处理,导致后续出现冲突问题,不同的中心节点将负责固定的一个或多个请求主题,在节点权限中记录。
41.进一步的,该系统的具体实现过程如下:
42.步骤1:将请求和mqtt主题进行关联,对应不同请求,约定对应的mqtt主题、mqtt反馈主题、主题标识符,部分解决消息冲突的问题;redis中存储各节点权限,各节点根据权限订阅对应主题;
43.步骤2:两个节点间进行数据传输交互,一次数据传输交互包括请求的发起与记录,请求的接收、记录与处理,请求信息的反馈与处理;消息数据通过mqtt模块进行传递,经过redis进行缓存、通过业务处理模块进行逻辑处理、通过关系型数据库进行持久化,然后将信息反馈给请求的发起方,进行相应的逻辑处理并将该次交互相关信息通过关系型数据库持久化;在这一步中,解决消息冲突与消息重复的问题;
44.步骤3:在特定情况下进行消息重传,解决消息丢失的问题,请求发起方在发出请求信息后,经过等待时间t未收到反馈信息,则重发请求信息;在重发次数达到最大重发次数n后,说明存在网络、硬件问题,该系统中不进行相对的具体处理;此时不进行重发,将该次失败的请求信息存入数据库中,便于其他途径解决问题时进行问题的回溯和原因的排查。这里的等待时间t和最大重发次数n根据具体现实情况进行设定和调整。
45.进一步的,所述步骤2具体包括以下四部分:
46.发起请求:需要发起请求的节点即请求发起方的业务处理模块整理报文有效荷载信息,包括主题标识符和时间戳组成的唯一标识符、请求详细内容,以对应的mqtt主题发布请求;请求发起方的redis中添加关于这次请求信息和请求状态的记录;
47.接收请求:接收请求的节点即请求接收方的mqtt客户端通过订阅的对应mqtt主题接收到请求信息,比对redis中是否已经有该唯一标识符对应的记录;若已经有对应记录了,不存储这条请求信息;若无,将信息存入redis中;
48.处理请求:请求接收方的业务处理单元从redis中获取接收信息,解析出具体请求内容;根据请求内容,业务处理单元执行相应功能逻辑;处理完成后,整理为反馈信息,包括请求所需返回内容、请求执行情况、唯一标识符;请求接收方的mqtt客户端以对应的mqtt反馈主题发布反馈;同时设置redis中这条信息记录的有效时间为等待时间t,经过t时间后该记录失效;
49.接收反馈:请求发起方的mqtt客户端通过对应反馈主题获取反馈信息,将信息存入redis中;redis中根据请求类型对接收到的信息根据指令分类,形成执行队列,业务处理单元从redis中获取信息,执行相应功能逻辑,并将这次请求信息和结果存入关系型数据库;业务完成后,改变redis中该请求记录中的状态,不再接收对应的反信息,并设置该条请求记录的有效时间为等待时间t,经过时间t后该记录失效。
50.本发明的一种基于mqtt和数据库的消息传输方法及系统与现有技术相比,具有以下有益效果:
51.本方法或系统利用mqtt发布/订阅模式的特性,各节点之间可以通过一条连接传输多种消息,无需重复建立新连接,简化了节点间的连接;同时,应用mqtt的qos0模式,可以在低网络、低硬件条件的情况下保证大量消息的实时可靠,保证了分布式系统中网络、硬件条件不好的节点的通信的可靠性;
52.利用重传机制保证请求稳定响应,防止qos0带来的消息丢失问题;
53.利用redis记录请求,保证请求流程的正常,防止请求及反馈的冲突;
54.利用关系型数据库存储请求结果,将请求持久化,防止请求的丢失,方便意外情况后了解请求记录和进行后续处理。
附图说明
55.图1是本发明实施例提供的基于mqtt和数据库的消息传输方法中节点结构示意图;
56.图2是本发明实施例提供的基于mqtt和数据库的消息传输方法中节点间通信过程示意图;
57.图3是本发明实施例提供的基于mqtt和数据库的消息传输方法中消息反馈重传示
意图。
具体实施方式
58.下面结合具体实施例对本发明作进一步说明。
59.本发明实施例提供了一种基于mqtt和数据库的消息传输方法,该方法的实现包括mqtt单元,redis,业务处理单元和关系型数据库;
60.所述mqtt单元包括配置在各节点的mqtt客户端和配置在云端服务器的mqtt服务端,所述mqtt客户端用于各节点发布请求/反馈信息和接收请求/反馈信息,所述mqtt服务端用于转发各节点上mqtt客户端发布的信息。
61.所述redis配置在各节点上,用于对相关信息进行快速缓存,主要包括存储节点权限、存储请求状态、存储接收的信息与存储反馈信息;节点权限主要为每个节点能发布、订阅的主题。
62.如图1所示,有时中心节点有多个且互相同步。为防止多个中心节点同时接收到反馈信息进行处理,导致后续出现冲突问题,不同的中心节点将负责固定的一个或多个请求主题,在节点权限中记录。
63.所述业务处理单元配置在各节点上,用于对请求信息或反馈信息进行处理、对关系型数据库进行操作;
64.所述关系型数据库配置在各节点上,用于存储该节点的数据信息、请求处理信息,对数据和处理结果进行持久化,便于后期回溯问题、解决问题、总结问题。
65.该方法的具体实现方式如下:
66.步骤1:将请求和mqtt主题进行关联,对应不同请求,约定对应的mqtt主题、mqtt反馈主题、主题标识符,部分解决消息冲突的问题;redis中存储各节点权限,各节点根据权限订阅对应主题。
67.步骤2:如图2所示,两个节点间进行数据传输交互。一次数据传输交互包括请求的发起与记录,请求的接收、记录与处理,请求信息的反馈与处理。消息数据通过mqtt模块进行传递,经过redis进行缓存、通过业务处理模块进行逻辑处理、通过关系型数据库进行持久化,然后将信息反馈给请求的发起方,进行相应的逻辑处理并将该次交互相关信息通过关系型数据库持久化。在这一步中,解决消息冲突与消息重复的问题。这个步骤具体包括以下四部分:
68.发起请求:需要发起请求的节点(下称请求发起方)的业务处理模块整理报文有效荷载信息,主要包括主题标识符和时间戳组成的唯一标识符、请求详细内容,以对应的mqtt主题发布请求;请求发起方的redis中添加关于这次请求信息和请求状态的记录;
69.接收请求:接收请求的节点(下称请求接收方)的mqtt客户端通过订阅的对应mqtt主题接收到请求信息,比对redis中是否已经有该唯一标识符对应的记录;若已经有对应记录了,不存储这条请求信息;若无,将信息存入redis中;
70.处理请求:请求接收方的业务处理单元从redis中获取接收信息,解析出具体请求内容;根据请求内容,业务处理单元执行相应功能逻辑;处理完成后,整理为反馈信息,主要包括请求所需返回内容、请求执行情况、唯一标识符;请求接收方的mqtt客户端以对应的mqtt反馈主题发布反馈;同时设置redis中这条信息记录的有效时间为等待时间t,经过t时
间后该记录失效;
71.接收反馈:请求发起方的mqtt客户端通过对应反馈主题获取反馈信息,将信息存入redis中;redis中根据请求类型对接收到的信息根据指令分类,形成执行队列,业务处理单元从redis中获取信息,执行相应功能逻辑,并将这次请求信息和结果存入关系型数据库;业务完成后,改变redis中该请求记录中的状态,不再接收对应的反信息,并设置该条请求记录的有效时间为等待时间t,经过时间t后该记录失效。
72.步骤3:如图2所示,在特定情况下进行消息重传,解决消息丢失的问题。请求发起方在发出请求信息后,经过等待时间t未收到反馈信息,则重发请求信息;在重发次数达到最大重发次数n后,说明存在网络、硬件问题,该系统中不进行相对的具体处理;此时不进行重发,将该次失败的请求信息存入数据库中,便于其他途径解决问题时进行问题的回溯和原因的排查。这里的等待时间t和最大重发次数n根据具体现实情况进行设定和调整。
73.本方法中,mqtt单元包括用于消息传输的mqtt客户端和mqtt服务端,redis用于存储节点权限、存储请求状态和存储接收的信息,业务处理单元用于执行对应的处理逻辑,关系型数据库用于存储该节点的数据信息、请求处理信息;一次完整的消息传输过程包括请求的发起与记录,请求的接收、记录与处理,请求信息的反馈与处理。通过该方法,弥补现有技术的缺陷,解决分布式系统中使用qos0机制mqtt所造成的消息重复、消息冲突、消息丢失问题,简化分布式节点间的连接并保证信息的实时可靠。
74.本发明实施例还提供一种基于mqtt和数据库的消息传输系统,包括mqtt单元,redis,业务处理单元和关系型数据库;
75.所述mqtt单元包括配置在各节点的mqtt客户端和配置在云端服务器的mqtt服务端,所述mqtt客户端用于各节点发布请求/反馈信息和接收请求/反馈信息,所述mqtt服务端用于转发各节点上mqtt客户端发布的信息。
76.所述redis配置在各节点上,用于对相关信息进行快速缓存,主要包括存储节点权限、存储请求状态、存储接收的信息与存储反馈信息;节点权限主要为每个节点能发布、订阅的主题。
77.有时中心节点有多个且互相同步。为防止多个中心节点同时接收到反馈信息进行处理,导致后续出现冲突问题,不同的中心节点将负责固定的一个或多个请求主题,在节点权限中记录。
78.所述业务处理单元配置在各节点上,是系统中完成业务要求及相关逻辑处理的具体处理单元,根据具体业务进行不同的设计。用于对请求信息或反馈信息进行处理、对关系型数据库进行操作。
79.所述关系型数据库配置在各节点上,用于存储该节点的数据信息、请求处理信息,对数据和处理结果进行持久化,便于后期回溯问题、解决问题、总结问题。
80.该系统的具体实现过程如下:
81.步骤1:将请求和mqtt主题进行关联,对应不同请求,约定对应的mqtt主题、mqtt反馈主题、主题标识符,部分解决消息冲突的问题;redis中存储各节点权限,各节点根据权限订阅对应主题。
82.步骤2:两个节点间进行数据传输交互,一次数据传输交互包括请求的发起与记录,请求的接收、记录与处理,请求信息的反馈与处理;消息数据通过mqtt模块进行传递,经
过redis进行缓存、通过业务处理模块进行逻辑处理、通过关系型数据库进行持久化,然后将信息反馈给请求的发起方,进行相应的逻辑处理并将该次交互相关信息通过关系型数据库持久化;在这一步中,解决消息冲突与消息重复的问题。该步骤具体包括以下四部分:
83.发起请求:需要发起请求的节点(下称请求发起方)的业务处理模块整理报文有效荷载信息,主要包括主题标识符和时间戳组成的唯一标识符、请求详细内容,以对应的mqtt主题发布请求;请求发起方的redis中添加关于这次请求信息和请求状态的记录。
84.接收请求:接收请求的节点(下称请求接收方)的mqtt客户端通过订阅的对应mqtt主题接收到请求信息,比对redis中是否已经有该唯一标识符对应的记录;若已经有对应记录了,不存储这条请求信息;若无,将信息存入redis中。
85.处理请求:请求接收方的业务处理单元从redis中获取接收信息,解析出具体请求内容;根据请求内容,业务处理单元执行相应功能逻辑;处理完成后,整理为反馈信息,主要包括请求所需返回内容、请求执行情况、唯一标识符;请求接收方的mqtt客户端以对应的mqtt反馈主题发布反馈;同时设置redis中这条信息记录的有效时间为等待时间t,经过t时间后该记录失效。
86.接收反馈:请求发起方的mqtt客户端通过对应反馈主题获取反馈信息,将信息存入redis中;redis中根据请求类型对接收到的信息根据指令分类,形成执行队列,业务处理单元从redis中获取信息,执行相应功能逻辑,并将这次请求信息和结果存入关系型数据库;业务完成后,改变redis中该请求记录中的状态,不再接收对应的反信息,并设置该条请求记录的有效时间为等待时间t,经过时间t后该记录失效。
87.步骤3:在特定情况下进行消息重传,解决消息丢失的问题,请求发起方在发出请求信息后,经过等待时间t未收到反馈信息,则重发请求信息;在重发次数达到最大重发次数n后,说明存在网络、硬件问题,该系统中不进行相对的具体处理;此时不进行重发,将该次失败的请求信息存入数据库中,便于其他途径解决问题时进行问题的回溯和原因的排查。这里的等待时间t和最大重发次数n根据具体现实情况进行设定和调整。
88.通过上面具体实施方式,所述技术领域的技术人员可容易的实现本发明。但是应当理解,本发明并不限于上述的具体实施方式。在公开的实施方式的基础上,所述技术领域的技术人员可任意组合不同的技术特征,从而实现不同的技术方案。
89.除说明书所述的技术特征外,均为本专业技术人员的已知技术。

技术特征:
1.一种基于mqtt和数据库的消息传输方法,其特征在于,该方法的实现包括mqtt单元,redis,业务处理单元和关系型数据库;所述mqtt单元包括配置在各节点的mqtt客户端和配置在云端服务器的mqtt服务端,所述mqtt客户端用于各节点发布请求/反馈信息和接收请求/反馈信息,所述mqtt服务端用于转发各节点上mqtt客户端发布的信息;所述redis配置在各节点上,用于存储节点权限、存储请求状态、存储接收的信息与存储反馈信息;所述业务处理单元配置在各节点上,用于对请求信息或反馈信息进行处理、对关系型数据库进行操作;所述关系型数据库配置在各节点上,用于存储该节点的数据信息、请求处理信息。2.根据权利要求1所述的一种基于mqtt和数据库的消息传输方法,其特征在于,所述redis存储节点权限,包括每个节点能发布、订阅的主题。3.根据权利要求1或2所述的一种基于mqtt和数据库的消息传输方法,其特征在于,所述redis存储节点权限,当中心节点有多个且互相同步,不同的中心节点将负责固定的一个或多个请求主题,在节点权限中记录。4.根据权利要求1所述的一种基于mqtt和数据库的消息传输方法,其特征在于,该方法的具体实现方式如下:步骤1:将请求和mqtt主题进行关联,对应不同请求,约定对应的mqtt主题、mqtt反馈主题、主题标识符;redis中存储各节点权限,各节点根据权限订阅对应主题;步骤2:两个节点间进行数据传输交互,一次数据传输交互包括请求的发起与记录,请求的接收、记录与处理,请求信息的反馈与处理;消息数据通过mqtt模块进行传递,经过redis进行缓存、通过业务处理模块进行逻辑处理、通过关系型数据库进行持久化,然后将信息反馈给请求的发起方,进行相应的逻辑处理并将该次交互相关信息通过关系型数据库持久化;步骤3:请求发起方在发出请求信息后,经过等待时间t未收到反馈信息,则重发请求信息;在重发次数达到最大重发次数n后,说明存在网络、硬件问题,不进行相对的具体处理;此时不进行重发,将该次失败的请求信息存入数据库中,便于其他途径解决问题时进行问题的回溯和原因的排查。5.根据权利要求4所述的一种基于mqtt和数据库的消息传输方法,其特征在于,所述步骤2具体包括以下四部分:发起请求:需要发起请求的节点即请求发起方的业务处理模块整理报文有效荷载信息,包括主题标识符和时间戳组成的唯一标识符、请求详细内容,以对应的mqtt主题发布请求;请求发起方的redis中添加关于这次请求信息和请求状态的记录;接收请求:接收请求的节点即请求接收方的mqtt客户端通过订阅的对应mqtt主题接收到请求信息,比对redis中是否已经有该唯一标识符对应的记录;若已经有对应记录了,不存储这条请求信息;若无,将信息存入redis中;处理请求:请求接收方的业务处理单元从redis中获取接收信息,解析出具体请求内容;根据请求内容,业务处理单元执行相应功能逻辑;处理完成后,整理为反馈信息,包括请求所需返回内容、请求执行情况、唯一标识符;请求接收方的mqtt客户端以对应的mqtt反馈
主题发布反馈;同时设置redis中这条信息记录的有效时间为等待时间t,经过t时间后该记录失效;接收反馈:请求发起方的mqtt客户端通过对应反馈主题获取反馈信息,将信息存入redis中;redis中根据请求类型对接收到的信息根据指令分类,形成执行队列,业务处理单元从redis中获取信息,执行相应功能逻辑,并将这次请求信息和结果存入关系型数据库;业务完成后,改变redis中该请求记录中的状态,不再接收对应的反信息,并设置该条请求记录的有效时间为等待时间t,经过时间t后该记录失效。6.一种基于mqtt和数据库的消息传输系统,其特征在于,包括mqtt单元,redis,业务处理单元和关系型数据库;所述mqtt单元包括配置在各节点的mqtt客户端和配置在云端服务器的mqtt服务端,所述mqtt客户端用于各节点发布请求/反馈信息和接收请求/反馈信息,所述mqtt服务端用于转发各节点上mqtt客户端发布的信息;所述redis配置在各节点上,用于存储节点权限、存储请求状态、存储接收的信息与存储反馈信息;所述业务处理单元配置在各节点上,用于对请求信息或反馈信息进行处理、对关系型数据库进行操作;所述关系型数据库配置在各节点上,用于存储该节点的数据信息、请求处理信息。7.根据权利要求6所述的一种基于mqtt和数据库的消息传输系统,其特征在于,所述redis存储节点权限,包括每个节点能发布、订阅的主题。8.根据权利要求6或7所述的一种基于mqtt和数据库的消息传输系统,其特征在于,所述redis存储节点权限,当中心节点有多个且互相同步,不同的中心节点将负责固定的一个或多个请求主题,在节点权限中记录。9.根据权利要求6所述的一种基于mqtt和数据库的消息传输系统,其特征在于,该系统的具体实现过程如下:步骤1:将请求和mqtt主题进行关联,对应不同请求,约定对应的mqtt主题、mqtt反馈主题、主题标识符;redis中存储各节点权限,各节点根据权限订阅对应主题;步骤2:两个节点间进行数据传输交互,一次数据传输交互包括请求的发起与记录,请求的接收、记录与处理,请求信息的反馈与处理;消息数据通过mqtt模块进行传递,经过redis进行缓存、通过业务处理模块进行逻辑处理、通过关系型数据库进行持久化,然后将信息反馈给请求的发起方,进行相应的逻辑处理并将该次交互相关信息通过关系型数据库持久化;步骤3:请求发起方在发出请求信息后,经过等待时间t未收到反馈信息,则重发请求信息;在重发次数达到最大重发次数n后,说明存在网络、硬件问题,该系统中不进行相对的具体处理;此时不进行重发,将该次失败的请求信息存入数据库中,便于其他途径解决问题时进行问题的回溯和原因的排查。10.根据权利要求9所述的一种基于mqtt和数据库的消息传输系统,其特征在于,所述步骤2具体包括以下四部分:发起请求:需要发起请求的节点即请求发起方的业务处理模块整理报文有效荷载信息,包括主题标识符和时间戳组成的唯一标识符、请求详细内容,以对应的mqtt主题发布请
求;请求发起方的redis中添加关于这次请求信息和请求状态的记录;接收请求:接收请求的节点即请求接收方的mqtt客户端通过订阅的对应mqtt主题接收到请求信息,比对redis中是否已经有该唯一标识符对应的记录;若已经有对应记录了,不存储这条请求信息;若无,将信息存入redis中;处理请求:请求接收方的业务处理单元从redis中获取接收信息,解析出具体请求内容;根据请求内容,业务处理单元执行相应功能逻辑;处理完成后,整理为反馈信息,包括请求所需返回内容、请求执行情况、唯一标识符;请求接收方的mqtt客户端以对应的mqtt反馈主题发布反馈;同时设置redis中这条信息记录的有效时间为等待时间t,经过t时间后该记录失效;接收反馈:请求发起方的mqtt客户端通过对应反馈主题获取反馈信息,将信息存入redis中;redis中根据请求类型对接收到的信息根据指令分类,形成执行队列,业务处理单元从redis中获取信息,执行相应功能逻辑,并将这次请求信息和结果存入关系型数据库;业务完成后,改变redis中该请求记录中的状态,不再接收对应的反信息,并设置该条请求记录的有效时间为等待时间t,经过时间t后该记录失效。

技术总结
本发明公开了一种基于MQTT和数据库的消息传输方法及系统,属于数据通讯技术领域,该方法的实现包括MQTT单元,Redis,业务处理单元和关系型数据库;所述MQTT单元包括配置在各节点的MQTT客户端和配置在云端服务器的MQTT服务端;所述Redis配置在各节点上,用于存储节点权限、存储请求状态、存储接收的信息与存储反馈信息;所述业务处理单元配置在各节点上,用于对请求信息或反馈信息进行处理、对关系型数据库进行操作;所述关系型数据库配置在各节点上,用于存储该节点的数据信息、请求处理信息。本发明能够解决分布式系统中使用QoS0机制MQTT所造成的消息重复、消息冲突、消息丢失问题。题。题。


技术研发人员:夏晨朗 郭春杰 黄沛明 杨继超 王中明 李胜
受保护的技术使用者:苏州思萃工业互联网技术研究所有限公司
技术研发日:2023.06.27
技术公布日:2023/9/7
版权声明

本文仅代表作者观点,不代表航家之家立场。
本文系作者授权航家号发表,未经原创作者书面授权,任何单位或个人不得引用、复制、转载、摘编、链接或以其他任何方式复制发表。任何单位或个人在获得书面授权使用航空之家内容时,须注明作者及来源 “航空之家”。如非法使用航空之家的部分或全部内容的,航空之家将依法追究其法律责任。(航空之家官方QQ:2926969996)

航空之家 https://www.aerohome.com.cn/

飞机超市 https://mall.aerohome.com.cn/

航空资讯 https://news.aerohome.com.cn/

分享:

扫一扫在手机阅读、分享本文

相关推荐