var mqtt = require('mqtt')var client = mqtt.connect('mqtt://iot.eclipse.org', { clientId: "mqtt_sample_id_1", clean: false //持久会话})client.on('connect', function (connack) { console.log(`return code: ${connack.returnCode}, sessionPresent:${connack.sessionPresent}`) client.end()})
建立非持久化会话var mqtt = require('mqtt')var client = mqtt.connect('mqtt://iot.eclipse.org', { clientId: "mqtt_sample_id_1", clean: true //非持久会话 })client.on('connect', function (connack) { console.log(`return code: ${connack.returnCode}, sessionPresent:${connack.sessionPresent}`) client.end()})
持久会话和非持久会话的就在clean的值是false还是true订阅发布模型一个典型的 MQTT 消息发送与接收的流程如下:ClientA 连接到 Broker;ClientB 连接到 Broker,并订阅主题 Topic1;ClientA 发送给 Broker 一条消息,主题为 Topic1;Broker 收到 ClientA 的消息,发现 ClientB 订阅了 Topic1,然后将消息转发到 ClientB;ClientB 从 Broker 接收到该消息和传统的队列有点不同,如果 ClientB 在 ClientA 发布消息之后再订阅 Topic1,ClientB 不会收到该条消息MQTT 通过订阅与发布模型对消息的发布者和订阅者进行解耦,发布者在发布消息时并不需要订阅方也连接到 Broker,只要订阅方之前订阅过相应主题,那么它在连接到 Broker 之后就可以收到发布方在它离线期间发布的 消息我们称这种消息为离线消息接收离线的消息需要 Client 使用持久化会话,且发布时消息的 QoS 不小于1Publisher 和 SubscriberPublisher 和 Subscriber 是相对于 Topic 来说的身份,如果一个 Client 向某个 Topic 发布消息,那么它就是Publisher;如果一个 Client 订阅了某个 Topic,那么它就是 Subscriber在上面的例子中,ClientA 是Publisher, ClientB 是 SubscriberSender 和 ReceiverSender 和 Receiver 是相对于消息传输方向的身份,仍然是上面的例子: 当 ClientA 发布消息时,它发送给 Broker 一条消息,那么 ClientA 是 Sender,Broker 是 Receiver; 当 Broker 转发消息给 ClientB 时,Broker 是 Sender,ClientB 是 ReceiverPublisher/Subscriber、Sender/Receiver 这两组概念最大的区别就是,Publisher 和 Subscriber 只可能是Client而 Sender/Receiver 有可能是 Client 和 Broker解释清楚这两个不同的概念之后,我们接下来看一下 PUBLISH 消息包发布PUBLISH 数据包是用于在 Sender 和 Receiver 之间传输消息数据的,也就是说,当 Publisher 要向某个Topic 发布一条消息的时候,Publisher 会向 Broker 发送一个 PUBLISH 数据包;当 Broker 要将一条消息转发给订阅了某条主题的 Subscriber 时,Broker 也会向 Subscriber 发送一条 PUBLISH 数据包QoS:2bit,0、1 或者 2,代表 PUBLISH 消息的 QoS level主题名称(Topic Name):主题名称是一个 UTF-8 编码的字符串,用来命名该消息发布到哪一个主题,Topic Name 可以是长度大于等于 1 任何一个字符串(可包含空格),但是在实际项目中,我们最好还是遵循以下一些最优方法 :主题名称应该包含层级,不同的层级用 / 划分,比如,2 楼 201 房间的温度感应器可以用这个主题:“home/2ndfloor/201/temperature”主题名称开头不要使用 /,例如:“/home/2ndfloor/201/temperature”不要在主题中使用空格只使用 ASCII 字符主题名称在可读的前提下尽量短主题是大小写敏感的,“Home” 和 “home” 是两个不同的主题可以将设备的唯一标识加到主题中,比如:“warehouse/shelf/shelf1_ID/status”主题尽量精确,不要使用泛用的主题,例如在 201 房间有三个传感器,温度、亮度和湿度,那么你应该使用三个主题名称:“home/2ndfloor/201/temperature”、“home/2ndfloor/201/brightness”和“home/2ndfloor/201/humidity”,而不是让三个传感器都使用“home/2ndfloor/201”以 $ 开头的主题属于 Broker 预留的系统主题,通常用于发布 Broker 的内部统计信息,比如,应用程序不要使用 开头的主题收发数据订阅订阅主题的流程如下:Client 向 Broker 发送一个 SUBSCRIBE 数据包,其中包含了 Client 想要订阅的主题以及其他一些参数;Broker 收到 SUBSCRIBE 数据包后,向 Client 发送一个 SUBACK 数据包作为应答Subscribe订阅列表(List of Subscriptions):SUBSCRIBE 的消息体中包含 Client 想要订阅的主题列表,列表中的每一项由订阅主题名和对应的 QoS 组成主题名中可以包含通配符,单层通配符“+”和多层通配符“#”使用包含通配符的主题名可以订阅满足匹配条件的所有主题为了和 PUBLISH 中的主题区分,我们叫SUBSCRIBE 中的主题名为主题过滤器(Topic Filter)单层通配符“+”:就如之前我们讲的,MQTT 的主题是具有层级概念的,不同的层级之间用“/”分割,“+”可以用来指代任意一个层级多层通配符“#”:“#”和“+”的区别在于,“#”可以用来指定任意多个层级,但是“#”必须是 TopicFilter 的最后一个字符,同时它必须跟在“/”后面,除非 Topic Filter 只包含“#”这一个字符SUBSCRIBE 数据包中 QoS 代表针对某一个或者一组主题,Client 希望 Broker 在发送来自这些主题的消息给它时,消息使用的 QoS 级别SUBACK返回码(return codes):SUBBACK 数据包包含了一组返回码,返回码的数量和顺序和 SUBSCRIBE 数据包 的订阅列表对应,用于标识订阅类别中的每一个订阅项的订阅结果返回码含义0订阅成功, 最大可用QoS为01订阅成功,最大可用QoS为12订阅成功, 最大可用QoS为2128订阅失败返回码 0~2 代表订阅成功,同时 Broker 授予 Subscriber 不同的 QoS 等级,这个等级可能会和 Subscriber在 SUBSCRIBE 数据包中要求的不一样返回码 128 代表订阅失败,比如 Client 没有权限订阅某个主题,或者要求订阅的主题格式不正确等订阅:代码实战如果你建立的是持久会话的连接,那么有可能 Broker 已经保存你在之前的连接时订阅的主题,你就没有必要再发起 SUBSCRIBE 请求 了,这个小优化在网络带宽或者设备处理能力较差的情况尤为重要 .var mqtt = require('mqtt')var client = mqtt.connect('mqtt://iot.eclipse.org', { clientId: "mqtt_sample_subscriber_id_1", clean: false})client.on('connect', function (connack) { if(connack.returnCode == 0) { if (connack.sessionPresent == false) { console.log("subscribing") client.subscribe("home/2ndfloor/201/temperature", {qos: 1 }, function (err, granted) { if (err != undefined) { console.log("subscribe failed") } else { console.log(`subscribe succeeded with ${granted[0].topic}, qos: ${granted[0].qos}`) } }) } }else { console.log(`Connection failed: ${connack.returnCode}`) }}) client.on("message", function (_, message, _) { var jsonPayload = JSON.parse(message.toString()) console.log(`current temperature is ${jsonPayload.current}`)}
取消订阅Subcriber 也可以取消对某些主题的订阅,取消订阅的流程如下:Client 向 Broker 发送一个 UNSUBSCRIBE 数据包,其中包含了 Client 想要取消订阅的主题;Broker 收到 UNSUBSCRIBE 数据包后,向 Client 发送一个 UNSUBACK 数据包作为应答取消订阅代码实践代码很简单,在建立连接之后取消对之前订阅的主题var mqtt = require('mqtt')var client = mqtt.connect('mqtt://iot.eclipse.org', { clientId: "mqtt_sample_subscriber_id_1", clean: false})client.on('connect', function (connack) { if (connack.returnCode == 0) { console.log("unsubscribing") client.unsubscribe("home/2ndfloor/201/temperature", function (err) { if (err != undefined) { console.log("unsubscribe failed") } else { console.log("unsubscribe succeeded") } client.end() }) } else { console.log(`Connection failed: ${connack.returnCode}`) }})
服务质量QoS(Quality of Service)MQTT设计了一套 保证消息稳定传输的机制,包括消息应答,存储和重传在这套机制下,提供了三种不同层次QoS:QoS0,最多一次,至多一次; QoS1,最少一次,至少一次; QoS2,恰好一次,确保只有一次QoS是消息的发送方(Sender)和接受方(Receiver)之间达成的一个协议: QoS0代表,发送者发送的一条消息,接收者最多能收到一次,也就是说发送者尽力向接收者发送消息,如果发送失败,也就算了; QoS1代表,发送者发送的一条消息,接收者至少能收到一次,也就是说发送者向接收者发送消息,如果发送失败,会继续重试,直到接收者收到消息为止,但是因为重传的原因,Receiver有可能会收到重复的消息; QoS2代表,发送者发送的一条消息,接收者确保能收到而且只收到一次,也就是说发送者尽力向接收者发送 消息,如果发送失败,会继续重试,直到接收者收到消息为止,同时保证会因为消息重传而收到重复的消 息要注意的是,QoS 是 Sender 和 Receiver 之间达成的协议,不是 Publisher 和 Subscriber 之间达成的协议也就是说 Publisher 发布一条 QoS1 的消息,只能保证 Broker 能至少收到一次这个消息;至于对应的Subscriber 能否至少收到一次这个消息,还要取决于 Subscriber 在 Subscribe 的时候和 Broker 协商的 QoS等级在 MQTT 协议中,从 Broker 到 Subscriber 这段消息传递的实际 QoS 等于:Publisher 发布消息时指定的 QoS 等级和 Subscriber 在订阅时与 Broker 协商的 QoS 等级,这两个QoS 等级中的最小那一个公式:Actual Subscribe QoS = MIN(Publish QoS, Subscribe QoS)
保留消息保留消息让我们来看一下这个场景:你有一个温度传感器,它每三个小时向一个主题发布当前的温度那么问题来了,有一个新的订阅者在它刚刚发布了当前温度之后订阅了这个主题,那么这个订阅端什么时候能才能收到温度消息?对的,它必须等到三个小时以后,温度传感器再次发布消息的时候才能收到在这之前,这个新的订阅者对传感器的温度数据一无所知怎么来解决这个问题呢?这个时候就轮到保留消息出场解决这个问题了.保留消息是指在PUBLISH数据包中保留标识设为1(retain=1)的消息,BROKER收到这样的PUBLISH包以后,将保存这个消息,当有一个新的订阅者订阅相应主题的时候,Broker会马上将这个消息发送给订阅者保留消息有以下一些特点:一个主题只能有1条保留消息,发布新的保留消息将覆盖老的保留消息;如果订阅者使用通配符订阅主题,它会收到所有匹配的主题上的保留消息;只有新的订阅者才会收到保留消息,如果订阅者重复订阅一个主题,也会被当做新的订阅者,然后收到保留消息发送到订阅者时,消息的保留标识仍然是1,订阅者可以判断这个消息是否是保留消息,以做相应的 处理注意:保留消息和持久性会话没有任何关系,保留消息是 Broker 为每一个 Topic 单独存储的,而持久性会话是 Broker 为每一个 Client 单独存储的如果你想删除一个 保留消息也很简单,只要向这个主题发布一个 Payload 长度为 0 的 保留消息就 可以了那么开头我们提到的那个场景的解决方案就很简单了,温度传感器每 3 个小时发布当前的温度的保留消息,那么无论新的订阅者什么时候进行订阅,它都能收到温度传感器上一次发布的数据代码实战发布保留消息var mqtt = require('mqtt')var client = mqtt.connect('mqtt://iot.eclipse.org', { clientId: "mqtt_sample_publisher_1", clean: false //保持持久会话})client.on('connect', function (connack) { if(connack.returnCode == 0){ client.publish("home/2ndfloor/201/temperature", JSON.stringify({current: 25}), {qos: 0,retain: 1}, function (err) { if(err == undefined) { console.log("Publish finished") client.end() }else{ console.log("Publish failed") } }) }else{ console.log(`Connection failed: ${connack.returnCode}`) }})
订阅保留消息var mqtt = require('mqtt')var client = mqtt.connect('mqtt://iot.eclipse.org', { clientId: "mqtt_sample_subscriber_id_chapter_8", clean: false //通知Broker保持持久会话})client.on('connect', function (connack) { if(connack.returnCode == 0) { if (connack.sessionPresent == false) { console.log("subscribing") //订阅 client.subscribe("home/2ndfloor/201/temperature", {qos: 0 }, function (err, granted) { if (err != undefined) { console.log("subscribe failed") } else { console.log(`subscribe succeeded with ${granted[0].topic}, qos:${granted[0].qos}`) } }) } }else { console.log(`Connection failed: ${connack.returnCode}`) }})//处理订阅的消息client.on("message", function (_, message, packet) { var jsonPayload = JSON.parse(message.toString()) console.log(`retained: ${packet.retain}, temperature: ${jsonPayload.current}`)})
遗愿(LWT : Last Will and Testament )LWT 全称为 Last Will and Testament,也就是连接到 Broker 时提到的遗愿,包括遗愿主题、遗愿QoS、遗愿消息等当 Broker 检测到 Client 非正常地断开连接的时候,就会向遗愿主题里面发布一条消息遗愿相关的设置是在建立连接的时候,在 CONNECT 数据包里面指定的Will Flag:是否使用 LWTWill Topic:遗愿主题名,不可使用通配符Will Qos:发布遗愿消息时使用的 QoSWill Retain:遗愿消息的 Retain 标识Will Message:遗愿消息内容Broker 在以下情况下认为 Client 是非正常断开连接的:Broker 检测到底层的 I/O 异常;Client 未能在 Keep Alive 的间隔内和 Broker 之间有消息交互;Client 在关闭底层 TCP 连接前没有发送 DISCONNECT 数据包;Broker 因为协议错误关闭和 Client 的连接,比如 Client 发送了一个格式错误的 MQTT 数据包如果 Client 通过发布 DISCONNECT 数据包断开连接,这个属于正常断开连接,不会触发 LWT 的机制,同时,Broker 还会丢弃掉这个 Client 在连接时指定的 LWT 参数通常,如果我们关心一个设备,比如传感器的连接状态,可以使用 LWT在接下来的代码实践里面,我们会使用 LWT 和 Retained 消息来实现对一个 Client 的连接状态监控代码实战实现 Client 连接状态监控的原理很简单:Client 在连接的时候指定 Will Topic 为“client/status”,遗愿消息为“offline”,Will Retain=1;Client 在连接成功以后向同一个主题“client/status”,发布一个内容为“online”的 Retained 消息那么订阅者在任何时候订阅“client/status”,都会获取 Client 当前的连接状态client.js 代码如下:var mqtt = require('mqtt')var client = mqtt.connect('mqtt://iot.eclipse.org', { clientId: "mqtt_sample_publisher_chapter_8", clean: false, will:{ topic : 'client/status', qos: 1, retain: true, payload: JSON.stringify({status: 'offline'}) }})client.on('connect', function (connack) {if(connack.returnCode == 0){client.publish("client/status", JSON.stringify({status: 'online'}), {qos: 1, retain:1})}else{console.log(`Connection failed: ${connack.returnCode}`)}})
monitor.js 代码如下:var mqtt = require('mqtt')var client = mqtt.connect('mqtt://iot.eclipse.org', { clientId: "mqtt_sample_subscriber_id_chapter_8_2", clean: false })client.on('connect', function () { client.subscribe("client/status", {qos: 1})}) client.on("message", function (_, message) { var jsonPayload = JSON.parse(message.toString()) console.log(`client is ${jsonPayload.status}`)})
在monitor.js中,我们每次连接的时候都重新订阅“client / status”,这样的话每次运行都能收到关于Client连接状态的保留消息首先运行node client.js,然后运行node monitor.js,会得到以下输出:client is online
在运行client.js的终端上,使用Ctrl+C终止client.js,之后在运行monitor.js的终端上会得到以下输出: client is offline然后重新运行node client.js,在运行monitor.js的终端上会得到以下输出:client is online
Ctrl+C终止monitor.js,然后重新运行node monitor.js,会得到以下输出:client is online
这样我们就完美地监控了Client的连接状态保持连接Broker需要知道客户是否非正常地断开了和它的连接,以发送遗愿消息客户也需要能够很快地检测到它失去了和Broker的连接,以便重新连接MQTT协议是基于TCP的一个应用层协议,理论上TCP协议在丢失连接时会通知上层应用,但是TCP有一个半打开连接的问题(半开连接)在半开连接这种状态下,一端的TCP连接已经失效,但是另外一端并不知情,它认为连接依然是打开的,它需要很长的时间才能感知到对端连接已经断开了,这种情况在使用移动或者卫星网络的时候尤为常见只是依赖TCP层的连接状态监测是不够的,于是MQTT协议设计了一套Keep Alive机制在建立连接的时候,我们可以传递一个Keep Alive参数,它的单位为秒,MQTT协议中约定:在 Keep Alive的时间间隔内,如果Broker没有收到来自客户的任何数据包,那么Broker认为它和客户之间的连接已经断开;同样地, 如果客户没有收到来自Broker的任何数据包,那么Client认为它和Broker之间的连接已经断开 MQTT 还有一对 PINGREQ/PINGRESP 数据包,当 Broker 和 Client 之间没有任何数据包传输的时候,可以通过 PINGREQ/PINGRESP 来满足 Keep Alive 的约定和侦测连接状态PINGREQPINGREQ 数据包没有可变头(Variable header)和消息体(Payload),当 Client 在一个 Keep Alive 时间间隔内没有向 Broker 发送任何数据包,比如 PUBLISH 和 SUBSCRIBE 的时候,它应该向 Broker 发送PINGREQ 数据包PINGRESPPINGRESP 数据包没有可变头(Variable header)和消息体(Payload),当 Broker 收到来自 Client 的PINGREQ 数据包,它应该回复 Client 一个 PINGRESP 数据包对于 Keep Alive 机制,我们还需要记住以下几点:如果在一个 Keep Alive 时间间隔内,Client 和 Broker 有过数据包传输,比如 PUBLISH,Client 就没有必要再使用 PINGREQ 了,在网络资源比较紧张的情况下这点很重要;Keep Alive 值是由 Client 指定的,不同的 Client 可以指定不同的值;Keep Alive 的最大值为 18 小时 12 分 15 秒;Keep Alive 值如果设为 0 的话,代表不使用 Keep Alive 机制(图片来源网络,侵删)
0 评论