RT 您所在的位置:网站首页 thread通讯协议 RT

RT

2023-06-05 04:48| 来源: 网络整理| 查看: 265

RT-Thread开发之路(5)— MQTT通信

基于:使用at_device软件包通过ESP8266连接到网络

一、添加pahomqtt软件包

打开【RT-Thread Settings】,搜索pahomqtt,然后点击添加 在这里插入图片描述 然后保存使之生效。

二、编写代码,连接到服务器

首先,包含要用到的头文件:

#include "paho_mqtt.h"

宏定义一些连接mqtt服务器需要的参数:

#define MQTT_Uri "tcp://39.96.35.207:1883" // MQTT服务器的地址和端口号 #define ClientId "BearPi8266" // ClientId需要唯一 #define UserName "BearPi" // 用户名 #define PassWord "123456" // 用户名对应的密码

接下来定义一个mqtt客户端结构体变量

/* 定义一个MQTT客户端结构体 */ static MQTTClient client;

接下来对MQTT进行配置

/* 对MQTT客户端结构体变量进行配置 */ client.isconnected = 0; client.uri = MQTT_Uri; /* 配置MQTT的连接参数 */ MQTTPacket_connectData condata = MQTTPacket_connectData_initializer; memcpy(&client.condata, &condata, sizeof(condata)); client.condata.clientID.cstring = ClientId; client.condata.keepAliveInterval = 30; client.condata.cleansession = 1; client.condata.username.cstring = (char*)USERNAME; client.condata.password.cstring = (char*)PASSWORD;

然后为MQTT的消息缓存申请内存

/* 为mqtt申请内存 */ client.buf_size = client.readbuf_size = 1024; client.buf = rt_calloc(1, client.buf_size); client.readbuf = rt_calloc(1, client.readbuf_size); if (!(client.buf && client.readbuf)) { rt_kprintf("no memory for MQTT client buffer!\r\n"); return -1; }

接下来我们设置下回调函数,以及订阅一个主题,其中设置的默认的回调函数,是在如果有订阅的 Topic 没有设置回调函数时,则使用该默认回调函数

/* 设置回调函数 */ client.connect_callback = mqtt_connect_callback; client.online_callback = mqtt_online_callback; client.offline_callback = mqtt_offline_callback; /* 订阅一个主题,并设置其回调函数 */ clienssageHandlers[0].topicFilter = rt_strdup("BearPi_Sub"); clienssageHandlers[0].callback = mqtt_sub_callback; clienssageHandlers[0].qos = QOS1; /* 设置默认的回调函数 */ client.defaultMessageHandler = mqtt_sub_default_callback;

然后我们实现各个回调函数,并在上线后发布一个主题消息

/* 收到订阅的"Bear_Pi"主题的消息时的回调函数*/ static void mqtt_sub_callback(MQTTClient *c, MessageData *msg_data) { *((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0'; rt_kprintf("Receive topic: %.*s, message data:\r\n", msg_data->topicName->lenstring.len, msg_data->topicName->lenstring.data); rt_kprintf("%.*s\r\n", msg_data->message->payloadlen, (char *)msg_data->message->payload); } /* 默认的订阅回调函数,如果有订阅的 Topic 没有设置回调函数,则使用该默认回调函数 */ static void mqtt_sub_default_callback(MQTTClient *c, MessageData *msg_data) { *((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0'; rt_kprintf("Receive topic: %.*s, message data:\r\n", msg_data->topicName->lenstring.len, msg_data->topicName->lenstring.data); rt_kprintf("%.*s\r\n", msg_data->message->payloadlen, (char *)msg_data->message->payload); } /* 连接成功回调函数 */ static void mqtt_connect_callback(MQTTClient *c) { rt_kprintf("mqtt connect success! \r\n"); } /* 上线回调函数 */ static void mqtt_online_callback(MQTTClient *c) { rt_kprintf("mqtt online \r\n"); paho_mqtt_publish(&client, QOS1, "BearPi_Pub", "Hello, I am BearPi with RT_Thread."); }

都设置好后,就可以启动MQTT连接了

/* 启动 mqtt client */ paho_mqtt_start(&client);

编译,下载,运行,打开EMQ的控制台,可以看到,已经连接上了 在这里插入图片描述 另外我们也可以看到,已经收到了其发布的主题消息 在这里插入图片描述 然后我们尝试对其订阅的主题发布一个消息,可以看到,也成功接收到了 在这里插入图片描述

三、优化代码

我们修改其程序,将其放入一个线程内,并将该线程的创建放入应用初始化中,并在这个线程中每隔一秒发布一个主题消息。代码如下:

#include #include #include #include #include "netdb.h" #include #include #include #include "paho_mqtt.h" #define MQTT_Uri "tcp://39.96.35.207:1883" // MQTT服务器的地址和端口号 #define ClientId "BearPi8266" // ClientId需要唯一 #define UserName "BearPi" // 用户名 #define PassWord "123456" // 用户名对应的密码 /* 定义一个MQTT线程句柄结构体指针 */ static rt_thread_t app_mqtt_thread = RT_NULL; /* 定义一个MQTT客户端结构体 */ static MQTTClient client; /* 收到订阅的"Bear_Pi"主题的消息时的回调函数*/ static void mqtt_sub_callback(MQTTClient *c, MessageData *msg_data) { *((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0'; rt_kprintf("Receive topic: %.*s, message data:\r\n", msg_data->topicName->lenstring.len, msg_data->topicName->lenstring.data); rt_kprintf("%.*s\r\n", msg_data->message->payloadlen, (char *)msg_data->message->payload); } /* 默认的订阅回调函数,如果有订阅的 Topic 没有设置回调函数,则使用该默认回调函数 */ static void mqtt_sub_default_callback(MQTTClient *c, MessageData *msg_data) { *((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0'; rt_kprintf("Receive topic: %.*s, message data:\r\n", msg_data->topicName->lenstring.len, msg_data->topicName->lenstring.data); rt_kprintf("%.*s\r\n", msg_data->message->payloadlen, (char *)msg_data->message->payload); } /* 连接成功回调函数 */ static void mqtt_connect_callback(MQTTClient *c) { rt_kprintf("mqtt connect success! \r\n"); } /* 上线回调函数 */ static void mqtt_online_callback(MQTTClient *c) { rt_kprintf("mqtt online \r\n"); paho_mqtt_publish(&client, QOS1, "BearPi_Pub", "Hello, I am BearPi with RT_Thread."); } /* 下线回调函数 */ static void mqtt_offline_callback(MQTTClient *c) { rt_kprintf("mqtt offline \r\n"); } static void app_mqtt_thread_entry(void *parameter) { int count = 1; char msg_buf[128]; /* 对MQTT客户端结构体变量进行配置 */ client.isconnected = 0; client.uri = MQTT_Uri; /* 配置MQTT的连接参数 */ MQTTPacket_connectData condata = MQTTPacket_connectData_initializer; memcpy(&client.condata, &condata, sizeof(condata)); client.condata.clientID.cstring = ClientId; client.condata.keepAliveInterval = 30; client.condata.cleansession = 1; client.condata.username.cstring = UserName; client.condata.password.cstring = PassWord; /* 为mqtt申请内存 */ client.buf_size = client.readbuf_size = 1024; client.buf = rt_calloc(1, client.buf_size); client.readbuf = rt_calloc(1, client.readbuf_size); if (!(client.buf && client.readbuf)) { rt_kprintf("no memory for MQTT client buffer!\r\n"); return ; } /* 设置回调函数 */ client.connect_callback = mqtt_connect_callback; client.online_callback = mqtt_online_callback; client.offline_callback = mqtt_offline_callback; /* 订阅一个主题,并设置其回调函数 */ clienssageHandlers[0].topicFilter = rt_strdup("BearPi_Sub"); clienssageHandlers[0].callback = mqtt_sub_callback; clienssageHandlers[0].qos = QOS1; /* 设置默认的回调函数 */ client.defaultMessageHandler = mqtt_sub_default_callback; /* 启动 mqtt client */ paho_mqtt_start(&client); while(1) { rt_sprintf(msg_buf, "publish %d times.", count++); paho_mqtt_publish(&client, QOS1, "BearPi_Pub", msg_buf); rt_thread_mdelay(1000); } } static int app_mqtt_init(void) { rt_err_t rt_err; /* 创建MQTT线程*/ app_mqtt_thread = rt_thread_create("app_mqtt thread", app_mqtt_thread_entry, RT_NULL, 2048, 6, 10); /* 如果获得线程控制块,启动这个线程 */ if (app_mqtt_thread != RT_NULL) rt_err = rt_thread_startup(app_mqtt_thread); else rt_kprintf("app_mqtt_thread create failure !!! \n"); /* 判断线程是否启动成功 */ if( rt_err == RT_EOK) rt_kprintf("app_mqtt_thread startup ok. \n"); else rt_kprintf("app_mqtt_thread startup err. \n"); return rt_err; } INIT_APP_EXPORT(app_mqtt_init);

运行后可以收到主题消息: 在这里插入图片描述



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有