MQTT,全称(Message Queuing Telemetry Transport)是一种轻量级、基于发布-订阅模式的消息传输协议。适用于资源受限的设备和低带宽、高延迟或不稳定的网络环境。因此常用于物联网、工业自动化、车联网和能源管理等领域。
MQTT具有以下特点:
本文以应用为主干,结合MQTT协议的特点,内容目录如下所示。
MQTT是基于发布-订阅模式的通信协议,由MQTT客户端通过主题(Topic)发布或订阅消息,通过MQTT-Broker集中管理消息路由;并依据预设的服务质量等级(QoS)确保端到端消息传递可靠性。其中各部分说明如下。
MQTT客户端:任何运行MQTT客户端库的应用或设备都是MQTT客户端。例如,使用MQTT的即时通讯应用是客户端,使用MQTT上报数据的各种传感器是客户端,各种MQTT测试工具也是客户端。
主题(Topic):MQTT协议中的一个重要概念,用于标识消息的目的地。客户端可以订阅一个或多个主题,当有消息发布到这些主题时,所有订阅该主题的客户端都会收到该消息。主题通常由多个层级组成,用“/”分隔,例如“/topic/temp”表示温度主题。
另一个关键信息就发布-订阅模式,它解耦了消息的发送者(发布者)和接收者(订阅者);发布者和订阅者之间无需建立直接连接,而是通过MQTT-Broker来负责消息的路由和分发;这里展示基于Mosquitto的MQTT的通讯框架典型应用实现。
由上图可知,MQTT的工作流程如下所示。
下面开始从应用角度来介绍MQTT的实现。
对于mqtt的应用,第一步是搭建自己服务,这里使用mosquitto,在ubuntu系统下安装mosquitto,命令如下所示。
# 安装mosquitto服务器
sudo apt-get install mosquitto mosquitto-clients
安装完成后,启动mosquitto,命令如下所示。
# 启动mosquitto服务器
sudo systemctl start mosquitto
启动完成后,查看mosquitto的状态,命令如下所示。
# 查看mosquitto服务器状态
systemctl status mosquitto
由上图可知,mosquitto服务器已经启动成功。
此时可以通过如下命令测试mosquitto服务器是否正常。
# 订阅本地地址(127.0.0.1:1883)上的/topic/temperature主题
mosquitto_sub -h localhost -t /topic/temperature &
# 发布消息
mosquitto_pub -h localhost -t /topic/temperature -m "Hello, MQTT!"
由上图可知,mosquitto服务器已经正常运行,已经能够正常的订阅和发布消息。
当然对于嵌入式系统平台,需要交叉编译mosquitto源码来实现。这一步参考如下步骤:
下一步就是构建mqtt客户端,这里有多种方案。
mqtt客户端基于C/C++实现时,首先系统要支持MQTT的库,对于桌面端平台,通过如下安装。
# 安装mosquitto库
sudo apt-get install libmosquitto-dev libmosquittopp-dev
安装完成后,使用-lmosquitto、-lmosquittopp即可链接编译mqtt客户端,编译脚本如下。
# 支持链接libmosquitto.so、libmosquittopp.so
CFLAGS += -O1 -lm -lmosquitto -lmosquittopp -std=c++17
具体C++实现代码如下所示。
#include "mosquittopp.h"
typedef struct
{
std::string id;
std::string host;
int port;
std::string sub_topic;
int keepalive{60};
int qos{1};
}mqtt_info;
using mosqpp::mosquittopp;
class mqtt_device : public mosquittopp
{
public:
/// \brief constructor
/// \param info -- info used to initialize the mqtt service.
mqtt_device(const mqtt_info &info, std::function<void(char *ptr, int size)> handler)
:mosquittopp(info.id.c_str()) {
info_ = info;
func_handler_ = handler;
memset(buffer_, 0, sizeof(buffer_));
}
/// \brief destructor
~mqtt_device() {
};
/// \brief on_connect
/// - 处理mqtt连接时的回调,在此处订阅主题。
/// \param rc -- state for mqtt connect, 0 is success.
void on_connect(int rc) {
std::cout << "Connected with broker, state: " << rc << std::endl;
if (rc == 0) {
/* Only attempt to subscribe on a successful connect. */
if (!info_.sub_topic.empty()) {
subscribe(NULL, info_.sub_topic.c_str(), info_.qos);
} else {
std::cout << "Sub topic is empty." << std::endl;
}
is_connet_ = true;
}
}
/// \brief on_message
/// - This method is used do mqtt message receive
/// \param message -- memssage when receive from subscribe topic
void on_message(const struct mosquitto_message *message) {
if (!strcmp(message->topic, info_.sub_topic.c_str())) {
/* Copy N-1 bytes to ensure always 0 terminated. */
memcpy(buffer_, message->payload, message->payloadlen);
buffer_[message->payloadlen] = '\0';
if (func_handler_) {
func_handler_(buffer_, message->payloadlen);
}
}
}
/// \brief on_subscribe
/// - This method is used do subscribe success.
/// \param mid -- message id
/// \param qos_count -- qos计数
/// \param granted_qos -- qos数组
void on_subscribe(int mid, int qos_count, const int *granted_qos) {
std::cout << "Subscribe succeeded." << std::endl;
}
/// \brief publish_msg
/// - This method is publish msg str.
/// \param str -- memssage when receive by subscribe
/// \return publish msg process.
int publish_msg(const std::string &topic, int qos, const char* ptr, int size) {
int ret = -1;
if (is_connet_ && size > 0) {
if (!topic.empty()) {
ret = publish(NULL, topic.c_str(), size, ptr, qos);
} else {
std::cout << "topic is empty." << std::endl;
}
}
return ret;
}
/// \brief start
/// - This method is used to start mqtt process.
bool start() {
thread_ = std::thread(std::bind(&mqtt_device::mqtt_run, this));
thread_.detach();
return true;
}
private:
/// \brief mqtt_run
/// - mqtt loop run thread.
void mqtt_run() {
mosqpp::lib_init();
std::cout << "mqtt run..." << std::endl;
connect(info_.host.c_str(), info_.port, info_.keepalive);
while (1) {
loop_forever();
}
mosqpp::lib_cleanup();
}
private:
/// \brief info_
/// - info used to store mqtt config.
mqtt_info info_;
/// \brief is_connet_
/// - wheather mqtt success connect.
bool is_connet_{false};
/// \brief thread_
/// - mqtt run thread object.
std::thread thread_;
/// \brief buffer_
/// - buffer store subscription information.
char buffer_[512];
/// \brief buffer_
/// - buffer store subscription information.
std::function<void(char *ptr, int size)> func_handler_;
};
在应用中,使用如下代码即可实现mqtt的创建,订阅和发布。
int main(int argc, char *argv[])
{
mqtt_info mqtt_process_info = {
id: "mqtt_node0",
host: "172.27.83.254",
port:1883,
sub_topic:"/topic/node0",
keepalive:60,
qos:1
};
uint32_t temp = 0;
std::string temp_str;;
std::shared_ptr<mqtt_device> mqtt_client = std::make_shared<mqtt_device>(mqtt_process_info, [](char *ptr, int size) {
std::cout << "recv msg: " << ptr << std::endl;
});
// 启动mqtt服务
mqtt_client->start();
while(1) {
temp++;
if (temp > 100) {
temp = 0;
}
temp_str = std::to_string(temp);
//周期性的提交数据
mqtt_client->publish_msg("/topic/temperature", 1, temp_str.c_str(), temp_str.size());
std::this_thread::sleep_for(std::chrono::seconds(3));
}
return 0;
}
可以使用mqtt_clients来测试数据是否正常,命令如下所示。
# 使用mosquitto获取mqtt服务
mosquitto_sub -h localhost -t /topic/temperature
由上图可知,mqtt客户端已经正常运行。
本节例程详见如下所示。
MQTT-Client的python客户端基于paho-mqtt库实现。
首先安装paho-mqtt库,命令如下所示。
pip3 install paho.mqtt -i https://pypi.tuna.tsinghua.edu.cn/simple/
安装完成后,使用如下代码即可实现mqtt的创建,订阅和发布。
import paho.mqtt.client as mqtt
import time
mqtt_client = mqtt.Client()
# 连接成功回调
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected with result code " + str(rc))
# 连接成功后订阅主题
client.subscribe("/topic/temperature")
else:
print(f"Failed to connect, return code {rc}")
# 接收到消息回调
def on_message(client, userdata, msg):
print(f"Received message on topic '{msg.topic}': {msg.payload.decode()}")
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
broker_address = "localhost"
broker_port = 1883
mqtt_client.connect(broker_address, broker_port)
mqtt_client.loop_start()
try:
while True:
# 发送消息
mqtt_client.publish("test/topic", "Hello, World!")
time.sleep(5)
except KeyboardInterrupt:
print("Disconnecting from MQTT-Broker...")
mqtt_client.loop_stop()
mqtt_client.disconnect()
使用如下命令执行。
# 启动mqtt客户端,订阅/topic/temperature
python3 mqtt_client.py
之后再执行上一章节的mqtt_node_0节点,发布消息;此时执行结果如下所示。
详细代码可见: mqtt_client.py
至此,关于MQTT相关内容初步介绍完毕。本文围绕MQTT展开,介绍其概念、特点、工作机制;并基于开源的协议和工具,描述构建和使用MQTT-Broker和MQTT-Client的方法。MQTT应用上通过解耦,将传统的CS框架(客户端-服务器模式),改为客户端-代理Broker-客户端的模式;这样单个节点的断开对于整个系统的影响就会很小,同时降低了实现一对多、多对多通讯的系统框架;通过MQTT-Broker承接了关键的连接,消息分发功能,降低了边缘智能设备的负载,同时通过QOS一定程度上确保了了消息传递的可靠性,使得边缘智能设备可以更加专注于业务逻辑的开发;如果希望了解智能家居,边缘计算,对于MQTT协议的理解,将是重要的环节。本文只能算是应用方面的介绍,更多关于MQTT的细节,请参考MQTT官网,并结合应用去实践。
直接开始下一节说明: 综合应用开发