build_embed_linux_system

mqtt组网和客户端应用实现

MQTT,全称(Message Queuing Telemetry Transport)是一种轻量级、基于发布-订阅模式的消息传输协议。适用于资源受限的设备和低带宽、高延迟或不稳定的网络环境。因此常用于物联网、工业自动化、车联网和能源管理等领域。

MQTT具有以下特点:

  1. 轻量级:报文格式简单,头部开销小,这使得它在资源受限的设备上也能轻松运行,减少了设备的处理负担和网络带宽的占用。
  2. 发布-订阅模式:支持发布-订阅模式,即多个客户端可以订阅同一个主题,当有消息发布到该主题时,所有订阅该主题的客户端都会收到该消息。解耦了消息的发送者和接收者,提高了灵活性
  3. Qos支持:支持三种 QoS(Quality of Service)级别,分别是 0、1 和 2,用于确保消息的可靠传输。
  4. 保持活动机制: 支持保持活动机制,即当客户端断开连接后,服务器会保留该客户端的订阅关系和消息,当客户端重新连接时,服务器会将该客户端的订阅关系和消息重新发送给该客户端。
  5. 支持多种传输协议:支持多种传输协议,如TCP/IP、WebSocket等,使得MQTT协议在多种网络环境中都能很好的运行。

本文以应用为主干,结合MQTT协议的特点,内容目录如下所示。

mqtt_work

MQTT是基于发布-订阅模式的通信协议,由MQTT客户端通过主题(Topic)发布或订阅消息,通过MQTT-Broker集中管理消息路由;并依据预设的服务质量等级(QoS)确保端到端消息传递可靠性。其中各部分说明如下。

另一个关键信息就发布-订阅模式,它解耦了消息的发送者(发布者)和接收者(订阅者);发布者和订阅者之间无需建立直接连接,而是通过MQTT-Broker来负责消息的路由和分发;这里展示基于Mosquitto的MQTT的通讯框架典型应用实现。

image

由上图可知,MQTT的工作流程如下所示。

  1. 客户端使用MQTT框架与MQTT-Broker建立连接,可以选择使用TLS/SSL加密来实现安全通信。客户端提供认证信息,并指定会话类型(Clean Session 或 Persistent Session)。
  2. 客户端既可以向特定主题发布消息,也可以订阅主题以接收消息。当客户端发布消息时,它会将消息发送给 MQTT-Broker;而当客户端订阅消息时,它会接收与订阅主题相关的消息。
  3. MQTT-Broker接收发布的消息,并将这些消息转发给订阅了对应主题的客户端。它根据 QoS 等级确保消息可靠传递,并根据会话类型为断开连接的客户端存储消息。

下面开始从应用角度来介绍MQTT的实现。

mqtt_broker

对于mqtt的应用,第一步是搭建自己服务,这里使用mosquitto,在ubuntu系统下安装mosquitto,命令如下所示。

# 安装mosquitto服务器
sudo apt-get install mosquitto mosquitto-clients

安装完成后,启动mosquitto,命令如下所示。

# 启动mosquitto服务器
sudo systemctl start mosquitto

启动完成后,查看mosquitto的状态,命令如下所示。

# 查看mosquitto服务器状态
systemctl status mosquitto

image

由上图可知,mosquitto服务器已经启动成功。

此时可以通过如下命令测试mosquitto服务器是否正常。

# 订阅本地地址(127.0.0.1:1883)上的/topic/temperature主题
mosquitto_sub -h localhost -t /topic/temperature &

# 发布消息
mosquitto_pub -h localhost -t /topic/temperature -m "Hello, MQTT!"

image

由上图可知,mosquitto服务器已经正常运行,已经能够正常的订阅和发布消息。

当然对于嵌入式系统平台,需要交叉编译mosquitto源码来实现。这一步参考如下步骤:

下一步就是构建mqtt客户端,这里有多种方案。

  1. 基于mosquitto库,通过C/C++实现mqtt客户端
  2. 使用paho-mqtt库,通过python实现mqtt客户端

mqtt_client_cpp

mqtt客户端基于C/C++实现时,首先系统要支持MQTT的库,对于桌面端平台,通过如下安装。

# 安装mosquitto库
sudo apt-get install libmosquitto-dev libmosquittopp-dev

安装完成后,使用-lmosquitto、-lmosquittopp即可链接编译mqtt客户端,编译脚本如下。

# 支持链接libmosquitto.so、libmosquittopp.so
CFLAGS += -O1 -lm -lmosquitto -lmosquittopp -std=c++17

具体C++实现代码如下所示。

#include "mosquittopp.h"

typedef struct
{
    std::string id;
    std::string host;
    int port;
    std::string sub_topic;
    int keepalive{60};
    int qos{1};
}mqtt_info;

using mosqpp::mosquittopp;

class mqtt_device : public mosquittopp
{
public:
    /// \brief constructor
    /// \param info -- info used to initialize the mqtt service.
    mqtt_device(const mqtt_info &info, std::function<void(char *ptr, int size)> handler)
    :mosquittopp(info.id.c_str()) {
        info_ = info;
        func_handler_ = handler;
        memset(buffer_, 0, sizeof(buffer_));
    }

    /// \brief destructor
    ~mqtt_device() {
    };

    /// \brief on_connect
    /// - 处理mqtt连接时的回调,在此处订阅主题。
    /// \param rc -- state for mqtt connect, 0 is success.
    void on_connect(int rc) {

        std::cout << "Connected with broker, state: " << rc << std::endl;
        if (rc == 0) {
            /* Only attempt to subscribe on a successful connect. */
            if (!info_.sub_topic.empty()) {
                subscribe(NULL, info_.sub_topic.c_str(), info_.qos);
            } else {
                std::cout << "Sub topic is empty." << std::endl;
            }
            
            is_connet_ = true;
        }
    }

    /// \brief on_message
    /// - This method is used do mqtt message receive
    /// \param message -- memssage when receive from subscribe topic
    void on_message(const struct mosquitto_message *message) {
        if (!strcmp(message->topic, info_.sub_topic.c_str())) {
            /* Copy N-1 bytes to ensure always 0 terminated. */
            memcpy(buffer_, message->payload, message->payloadlen);
            buffer_[message->payloadlen] = '\0';
            if (func_handler_) {
                func_handler_(buffer_, message->payloadlen);
            }
        }
    }

    /// \brief on_subscribe
    /// - This method is used do subscribe success.
    /// \param mid -- message id
    /// \param qos_count -- qos计数
    /// \param granted_qos -- qos数组
    void on_subscribe(int mid, int qos_count, const int *granted_qos) {
        std::cout << "Subscribe succeeded." << std::endl;
    }

    /// \brief publish_msg
    /// - This method is publish msg str.
    /// \param str -- memssage when receive by subscribe
    /// \return publish msg process.
    int publish_msg(const std::string &topic, int qos, const char* ptr, int size) {
        int ret = -1;

        if (is_connet_ && size > 0) {
            if (!topic.empty()) {
                ret = publish(NULL, topic.c_str(), size, ptr, qos);
            } else {
                std::cout << "topic is empty." << std::endl;
            }
        }
        
        return ret;
    }

    /// \brief start
    /// - This method is used to start mqtt process.
    bool start() {
        thread_ = std::thread(std::bind(&mqtt_device::mqtt_run, this));
        thread_.detach();

        return true;
    }
private:
    /// \brief mqtt_run
    /// - mqtt loop run thread.
    void mqtt_run() {
        mosqpp::lib_init();

        std::cout << "mqtt run..." << std::endl;
        
        connect(info_.host.c_str(), info_.port, info_.keepalive);
    
        while (1) {
            loop_forever();
        }
    
        mosqpp::lib_cleanup();
    }

private:
    /// \brief info_
    /// - info used to store mqtt config.
    mqtt_info info_;

    /// \brief is_connet_
    /// - wheather mqtt success connect.
    bool is_connet_{false};

    /// \brief thread_
    /// - mqtt run thread object.
    std::thread thread_;

    /// \brief buffer_
    /// - buffer store subscription information.
    char buffer_[512];

    /// \brief buffer_
    /// - buffer store subscription information.
    std::function<void(char *ptr, int size)> func_handler_;
};

在应用中,使用如下代码即可实现mqtt的创建,订阅和发布。

int main(int argc, char *argv[])
{
    mqtt_info mqtt_process_info = {
        id: "mqtt_node0",
        host: "172.27.83.254",
        port:1883,
        sub_topic:"/topic/node0",
        keepalive:60,
        qos:1
    };
    uint32_t temp = 0;
    std::string temp_str;;

    std::shared_ptr<mqtt_device> mqtt_client = std::make_shared<mqtt_device>(mqtt_process_info, [](char *ptr, int size) {
        std::cout << "recv msg: " << ptr << std::endl;
    });

    // 启动mqtt服务
    mqtt_client->start();

    while(1) {
        temp++;
        if (temp > 100) {
            temp = 0;
        }
        temp_str = std::to_string(temp);

        //周期性的提交数据
        mqtt_client->publish_msg("/topic/temperature", 1, temp_str.c_str(), temp_str.size());
        std::this_thread::sleep_for(std::chrono::seconds(3));
    }

    return 0;
}

可以使用mqtt_clients来测试数据是否正常,命令如下所示。

# 使用mosquitto获取mqtt服务
mosquitto_sub -h localhost -t /topic/temperature

image

由上图可知,mqtt客户端已经正常运行。

本节例程详见如下所示。

mqtt_client_python

MQTT-Client的python客户端基于paho-mqtt库实现。

首先安装paho-mqtt库,命令如下所示。

pip3 install paho.mqtt -i https://pypi.tuna.tsinghua.edu.cn/simple/

安装完成后,使用如下代码即可实现mqtt的创建,订阅和发布。

import paho.mqtt.client as mqtt
import time

mqtt_client = mqtt.Client()

# 连接成功回调
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("Connected with result code " + str(rc))
        # 连接成功后订阅主题
        client.subscribe("/topic/temperature")
    else:
        print(f"Failed to connect, return code {rc}")

# 接收到消息回调
def on_message(client, userdata, msg):
    print(f"Received message on topic '{msg.topic}': {msg.payload.decode()}")

mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message

broker_address = "localhost"
broker_port = 1883
mqtt_client.connect(broker_address, broker_port)

mqtt_client.loop_start()

try:
    while True:
        # 发送消息
        mqtt_client.publish("test/topic", "Hello, World!")
        time.sleep(5)
except KeyboardInterrupt:
    print("Disconnecting from MQTT-Broker...")
    mqtt_client.loop_stop()
    mqtt_client.disconnect()

使用如下命令执行。

# 启动mqtt客户端,订阅/topic/temperature
python3 mqtt_client.py

之后再执行上一章节的mqtt_node_0节点,发布消息;此时执行结果如下所示。

image

详细代码可见: mqtt_client.py

summary

至此,关于MQTT相关内容初步介绍完毕。本文围绕MQTT展开,介绍其概念、特点、工作机制;并基于开源的协议和工具,描述构建和使用MQTT-Broker和MQTT-Client的方法。MQTT应用上通过解耦,将传统的CS框架(客户端-服务器模式),改为客户端-代理Broker-客户端的模式;这样单个节点的断开对于整个系统的影响就会很小,同时降低了实现一对多、多对多通讯的系统框架;通过MQTT-Broker承接了关键的连接,消息分发功能,降低了边缘智能设备的负载,同时通过QOS一定程度上确保了了消息传递的可靠性,使得边缘智能设备可以更加专注于业务逻辑的开发;如果希望了解智能家居,边缘计算,对于MQTT协议的理解,将是重要的环节。本文只能算是应用方面的介绍,更多关于MQTT的细节,请参考MQTT官网,并结合应用去实践。

next_chapter

返回目录

直接开始下一节说明: 综合应用开发