MQTT学习笔记(C#) 您所在的位置:网站首页 云端连接断开是什么意思 MQTT学习笔记(C#)

MQTT学习笔记(C#)

2024-07-02 01:59| 来源: 网络整理| 查看: 265

  代码地址:https://gitee.com/qq28069933146_admin/csharp_networkprotocol_research

  演示地址:C#-MQTT调用示例演示

一,什么是MQTT:

  MQTT(消息队列遥测传输)是IBM开发的即时通讯协议,是一个基于客户端-服务器的消息发布/订阅极其轻量级的消息传输协议。

  它工作在 TCP/IP协议族上,专门为网络受限设备、低宽带以及高延迟和不可靠的网络而设计的(传输特点:至多一次、至少一次、只有一次)。使其在物联网、小型设备、移动应用等方面有较广泛的应用。

 1.MQTT报文:

  详情见:https://www.cnblogs.com/hayasi/p/7743356.html

 2.MQTT三种身份:发布者(客户端)、代理(服务器)、订阅者(客户端)。

 

二,示例

  可以选择MQTTnet包或者DotNetty.Codecs.Mqtt包进行学习,这里我们选用Mqtt进行学习。

  VS2022

  Net5

  MQTTnet 4.1.4.563

 1.服务端:

  ①服务器对象初始化(new MqttFactory().CreateMqttServer(new MqttServerOptionsBuilder().Build());)

MqttServerOptionsBuilder mqttServerOptionsBuilder = new MqttServerOptionsBuilder(); // MQTT服务器配置 mqttServerOptionsBuilder.WithDefaultEndpoint(); mqttServerOptionsBuilder.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(ip)); // 设置Server的IP mqttServerOptionsBuilder.WithDefaultEndpointPort(port); // 设置Server的端口号 //mqttServerOptionsBuilder.WithEncryptedEndpointPort(port); // 使用加密的端点端口 mqttServerOptionsBuilder.WithPersistentSessions(withPersistentSessions); // 持续会话 mqttServerOptionsBuilder.WithConnectionBacklog(2000); // 最大连接数 //mqttServerOptionsBuilder.WithConnectionValidator(c => // 鉴权-方法失效 //{ // if (c.Username != uName || c.Password != uPwd) // { // c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; // } //}) MqttServerOptions mqttServerOptions = mqttServerOptionsBuilder.Build(); _MqttServer = new MqttFactory().CreateMqttServer(mqttServerOptions); // 创建服务(配置)

   补充:MqttServerOptionsBuilder的属性(注:有些方法4.1.4.563版本不支持了)

函数名 功能说明 Build() 构建配置参数 WithApplicationMessageInterceptor() 允许处理来自客户端的所有已发布消息 WithClientId() 服务端发布消息时使用的ClientId WithConnectionBacklog() 设置要保留的连接数 WithConnectionValidator() 验证连接 WithDefaultCommunicationTimeout() 设置默认的通信超时 WithDefaultEndpoint() 使用默认端点 WithDefaultEndpointBoundIPAddress() 使用默认端点IPv4地址 WithDefaultEndpointBoundIPV6Address() 使用默认端点IPv6地址 WithDefaultEndpointPort() 使用默认端点端口 WithEncryptedEndpoint() 使用加密的端点 WithEncryptedEndpointBoundIPAddress() 使用加密的端点IPv4地址 WithEncryptedEndpointBoundIPV6Address() 使用加密的端点IPv6地址 WithEncryptedEndpointPort() 使用加密的端点端口 WithEncryptionCertificate() 使用证书进行SSL连接 WithEncryptionSslProtocol() 使用SSL协议级别 WithMaxPendingMessagesPerClient() 每个客户端允许最多未决消息 WithPersistentSessions() 保持会话 WithStorage() 使用存储 WithSubscriptionInterceptor() 允许处理来自客户端的所有订阅 WithoutDefaultEndpoint() 禁用默认端点 WithoutEncryptedEndpoint() 禁用默认(SSL)端点

  ②服务器开启(await mqttServer.StartAsync())

await _MqttServer.StartAsync(); // 开启服务

  ③服务器关闭(await _MqttServer.StopAsync())

foreach (var clientStatus in _MqttServer.GetClientsAsync().Result) { await clientStatus.DisconnectAsync(); } await _MqttServer.StopAsync(); _MqttServer = null;

  ④是否对客户端的进行验证(账号密码-原方法失效)

//mqttServerOptionsBuilder.WithConnectionValidator(c => // 鉴权-方法失效 //{ // if (c.Username != uName || c.Password != uPwd) // { // c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; // } //})

  ⑤给客户端发送数据(原方法失效)

/// /// 发送消息-未写(原方法失效) /// /// 主题 /// 消息 /// public Task SedMessage(string Topic, string msg) { try { //var clients = _MqttServer.GetClientsAsync().Result; //foreach (var client in clients) //{ //} } catch { } return Task.CompletedTask; }

  ⑥获取所有的客户端(_MqttServer.GetClientsAsync().Result.ToList())

_MqttServer.GetClientsAsync().Result.ToList()

  ⑦服务器开启/关闭事件(StartedAsync与StoppedAsync)

_MqttServer.StartedAsync += StartedHandle; // 服务器开启事件 /// /// 开启Server的处理程序 /// private Task StartedHandle(EventArgs arg) { return Task.CompletedTask; } _MqttServer.StoppedAsync += StoppedHandle; // 服务器关闭事件 /// /// 关闭Server的处理程序 /// private Task StoppedHandle(EventArgs arg) { return Task.CompletedTask; }

  ⑧客户端连接/断开的处理事件

_MqttServer.ClientConnectedAsync += ClientConnectedHandle; // 设置客户端连接成功后的处理程序 /// /// 设置客户端连接成功后的处理程序 /// private Task ClientConnectedHandle(ClientConnectedEventArgs arg) { return Task.CompletedTask; } _MqttServer.ClientDisconnectedAsync += ClientDisconnectedHandle; // 设置客户端断开后的处理程序 /// /// 设置客户端断开后的处理程序 /// private Task ClientDisconnectedHandle(ClientDisconnectedEventArgs arg) { return Task.CompletedTask; }

  ⑨消息被订阅/被退订事件

_MqttServer.ClientSubscribedTopicAsync += ClientSubscribedTopicHandle; // 设置消息订阅通知 _MqttServer.ClientUnsubscribedTopicAsync += ClientUnsubscribedTopicHandle; // 设置消息退订通知 /// /// 设置消息订阅通知 /// private Task ClientSubscribedTopicHandle(ClientSubscribedTopicEventArgs arg) { //if (!arg.Equals("admin")) //{ // var client = clients.Where(a => a.Id == arg.ClientId).FirstOrDefault(); // client?.DisconnectAsync(); // return Task.CompletedTask; //} return Task.CompletedTask; } /// /// 设置消息退订通知 /// private Task ClientUnsubscribedTopicHandle(ClientUnsubscribedTopicEventArgs arg) { return Task.CompletedTask; }

  ⑩接收到消息时的处理程序

_MqttServer.ApplicationMessageNotConsumedAsync += ApplicationMessageNotConsumedHandle; // 设置消息处理程序 /// /// 设置消息处理程序 /// private Task ApplicationMessageNotConsumedHandle(ApplicationMessageNotConsumedEventArgs arg) { _Callback?.Invoke(new ResultData_MQTT() { ResultCode = -1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端:'{arg.SenderId}'发布了消息:主题:'{arg.ApplicationMessage.Topic}'!内容:'{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}';服务质量:{arg.ApplicationMessage.QualityOfServiceLevel};保留:{arg.ApplicationMessage.Retain}" }); return Task.CompletedTask; }  2.客户端:

  ①客户端对象初始化并连接(_MqttClient.ConnectAsync(new MqttClientOptionsBuilder().Build()))

MqttClientOptionsBuilder mqttClientOptionsBuilder = new MqttClientOptionsBuilder(); mqttClientOptionsBuilder.WithTcpServer(mqttServerUrl, port); // 设置MQTT服务器地址 if (!string.IsNullOrEmpty(userName)) { mqttClientOptionsBuilder.WithCredentials(userName, userPassword); // 设置鉴权参数 } mqttClientOptionsBuilder.WithClientId(Guid.NewGuid().ToString("N")); // 设置客户端序列号 MqttClientOptions options = mqttClientOptionsBuilder.Build(); _MqttClient = new MqttFactory().CreateMqttClient(); _MqttClient.ConnectedAsync += ConnectedHandle; // 服务器连接事件 _MqttClient.DisconnectedAsync += DisconnectedHandle; // 服务器断开事件(可以写入重连事件) _MqttClient.ApplicationMessageReceivedAsync += ApplicationMessageReceivedHandle; // 发送消息事件 await _MqttClient.ConnectAsync(options); // 连接

  ②与服务器断开连接(await _MqttClient.DisconnectAsync())

await _MqttClient.DisconnectAsync(); _MqttClient.Dispose(); _MqttClient = null;

  ③与服务器重新连接(await _MqttClient.ReconnectAsync())

await _MqttClient.ReconnectAsync();

  ④订阅与退订主题()

MqttTopicFilter topicFilter = new MqttTopicFilterBuilder().WithTopic(topic).Build(); await _MqttClient.SubscribeAsync(topicFilter, CancellationToken.None); // 订阅 await MqttClientExtensions.UnsubscribeAsync(_MqttClient, topic, CancellationToken.None); // 退订

  ⑤发布消息

MqttApplicationMessageBuilder mqttApplicationMessageBuilder = new MqttApplicationMessageBuilder(); // 设置内容 mqttApplicationMessageBuilder.WithTopic(topic); // 主题 mqttApplicationMessageBuilder.WithPayload(msg); // 信息 mqttApplicationMessageBuilder.WithRetainFlag(retained); // 保留 MqttApplicationMessage messageObj = mqttApplicationMessageBuilder.Build(); await _MqttClient.PublishAsync(messageObj, CancellationToken.None); // 发送

  ⑥与服务器连接/断开事件

_MqttClient.ConnectedAsync += ConnectedHandle; // 服务器连接事件 _MqttClient.DisconnectedAsync += DisconnectedHandle; // 服务器断开事件(可以写入重连事件) /// /// 服务器连接事件 /// private Task ConnectedHandle(MqttClientConnectedEventArgs arg) { return Task.CompletedTask; } /// /// 服务器断开事件(可以写入重连事件) /// private Task DisconnectedHandle(MqttClientDisconnectedEventArgs arg) { return Task.CompletedTask; }

  ⑦订阅/退订事件

// 略-原方法失效

  ⑧发送消息事件

_MqttClient.ApplicationMessageReceivedAsync += ApplicationMessageReceivedHandle; // 发送消息事件 /// /// 发送消息事件 /// private Task ApplicationMessageReceivedHandle(MqttApplicationMessageReceivedEventArgs arg) { string resultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>MQTTClient'{arg.ClientId}'内容:'{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}';主题:'{arg.ApplicationMessage.Topic}',消息等级Qos:[{arg.ApplicationMessage.QualityOfServiceLevel}],是否保留:[{arg.ApplicationMessage.Retain}]", return Task.CompletedTask; } 三、MQTTHelper /** *┌──────────────────────────────────────────────────────────────┐ *│ 描 述:MQTT通讯相关的工具类(MQTTnet 4.1.4.563) *│ 作 者:执笔小白 *│ 版 本:1.0 *│ 创建时间:2023-3-18 10:40:56 *└──────────────────────────────────────────────────────────────┘ *┌──────────────────────────────────────────────────────────────┐ *│ 命名空间: MqttnetServerWin *│ 类 名:MQTTHelper *└──────────────────────────────────────────────────────────────┘ */ using MQTTnet; using MQTTnet.Client; using MQTTnet.Packets; using MQTTnet.Server; using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Text; using System.Threading; using System.Threading.Tasks; namespace MqttnetServerWin { /// /// MQTT通讯相关的工具类 /// public class MQTTHelper { #region 变量 /// /// 记录日志、输出、保存等操作 /// private Action? _Callback = null; #endregion 变量 #region Server /// /// MQTT服务 /// MqttServer _MqttServer = null; /// /// 创建MQTTServer并运行 /// public async Task CreateMQTTServerAndStart(MqttServerOptionsBuilder mqttServerOptionsBuilder, Action? callback) { ResultData_MQTT resultData_MQTT = new ResultData_MQTT(); _Callback = callback; try { MqttServerOptions mqttServerOptions = mqttServerOptionsBuilder.Build(); _MqttServer = new MqttFactory().CreateMqttServer(mqttServerOptions); // 创建服务(配置) _MqttServer.StartedAsync += StartedHandle; // 服务器开启事件 _MqttServer.StoppedAsync += StoppedHandle; // 服务器关闭事件 _MqttServer.ClientConnectedAsync += ClientConnectedHandle; // 设置客户端连接成功后的处理程序 _MqttServer.ClientDisconnectedAsync += ClientDisconnectedHandle; // 设置客户端断开后的处理程序 _MqttServer.ClientSubscribedTopicAsync += ClientSubscribedTopicHandle; // 设置消息订阅通知 _MqttServer.ClientUnsubscribedTopicAsync += ClientUnsubscribedTopicHandle; // 设置消息退订通知 _MqttServer.ApplicationMessageNotConsumedAsync += ApplicationMessageNotConsumedHandle; // 设置消息处理程序 await _MqttServer.StartAsync(); // 开启服务 if (_MqttServer.IsStarted) { resultData_MQTT = new ResultData_MQTT() { ResultCode = 1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_成功!" }; } else { resultData_MQTT = new ResultData_MQTT() { ResultCode = -1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_失败!" }; } } catch (Exception ex) { resultData_MQTT = new ResultData_MQTT() { ResultCode = -1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_失败!错误信息:" + ex.Message }; } _Callback?.Invoke(resultData_MQTT); return resultData_MQTT; } /// /// 简易创建MQTTServer并运行-不使用加密 /// /// IP /// 端口 /// 是否保持会话 /// 处理方法 /// public async Task CreateMQTTServerAndStart(string ip, int port, bool withPersistentSessions, Action? callback) { ResultData_MQTT resultData_MQTT = new ResultData_MQTT(); _Callback = callback; try { MqttServerOptionsBuilder mqttServerOptionsBuilder = new MqttServerOptionsBuilder(); // MQTT服务器配置 mqttServerOptionsBuilder.WithDefaultEndpoint(); mqttServerOptionsBuilder.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(ip)); // 设置Server的IP mqttServerOptionsBuilder.WithDefaultEndpointPort(port); // 设置Server的端口号 //mqttServerOptionsBuilder.WithEncryptedEndpointPort(port); // 使用加密的端点端口 mqttServerOptionsBuilder.WithPersistentSessions(withPersistentSessions); // 持续会话 mqttServerOptionsBuilder.WithConnectionBacklog(2000); // 最大连接数 //mqttServerOptionsBuilder.WithConnectionValidator(c => // 鉴权-方法失效 //{ // if (c.Username != uName || c.Password != uPwd) // { // c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; // } //}) MqttServerOptions mqttServerOptions = mqttServerOptionsBuilder.Build(); _MqttServer = new MqttFactory().CreateMqttServer(mqttServerOptions); // 创建服务(配置) _MqttServer.StartedAsync += StartedHandle; // 服务器开启事件 _MqttServer.StoppedAsync += StoppedHandle; // 服务器关闭事件 _MqttServer.ClientConnectedAsync += ClientConnectedHandle; // 设置客户端连接成功后的处理程序 _MqttServer.ClientDisconnectedAsync += ClientDisconnectedHandle; // 设置客户端断开后的处理程序 _MqttServer.ClientSubscribedTopicAsync += ClientSubscribedTopicHandle; // 设置消息订阅通知 _MqttServer.ClientUnsubscribedTopicAsync += ClientUnsubscribedTopicHandle; // 设置消息退订通知 _MqttServer.ValidatingConnectionAsync += ValidatingConnectionHandle; // 鉴权-未完 _MqttServer.ApplicationMessageNotConsumedAsync += ApplicationMessageNotConsumedHandle; // 设置消息处理程序 await _MqttServer.StartAsync(); // 开启服务 if (_MqttServer.IsStarted) { resultData_MQTT = new ResultData_MQTT() { ResultCode = 1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_成功!" }; } else { resultData_MQTT = new ResultData_MQTT() { ResultCode = -1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_失败!" }; } } catch (Exception ex) { resultData_MQTT = new ResultData_MQTT() { ResultCode = -1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_失败!错误信息:" + ex.Message }; } _Callback?.Invoke(resultData_MQTT); return resultData_MQTT; } /// /// 关闭MQTTServer /// public async Task StopMQTTServer() { ResultData_MQTT resultData_MQTT = new ResultData_MQTT(); try { if (_MqttServer == null) { resultData_MQTT = new ResultData_MQTT() { ResultCode = -1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTServer_出错!MQTTServer未在运行。" }; } else { foreach (var clientStatus in _MqttServer.GetClientsAsync().Result) { await clientStatus.DisconnectAsync(); } await _MqttServer.StopAsync(); _MqttServer = null; resultData_MQTT = new ResultData_MQTT() { ResultCode = 1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTServer_成功!" }; } } catch (Exception ex) { resultData_MQTT = new ResultData_MQTT() { ResultCode = -1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTServer_失败!错误信息:" + ex.Message }; } _Callback?.Invoke(resultData_MQTT); return resultData_MQTT; } /// /// 获取所有的客户端 /// public List GetClientsAsync() { return _MqttServer.GetClientsAsync().Result.ToList(); } /// /// 发送消息-未写 /// /// 主题 /// 消息 /// public Task SedMessage(string Topic, string msg) { try { //var clients = _MqttServer.GetClientsAsync().Result; //foreach (var client in clients) //{ //} } catch { } return Task.CompletedTask; } #region 处理事件 /// /// 开启Server的处理程序 /// private Task StartedHandle(EventArgs arg) { _Callback?.Invoke(new() { ResultCode = 1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>MQTTServer已开启!" }); return Task.CompletedTask; } /// /// 关闭Server的处理程序 /// private Task StoppedHandle(EventArgs arg) { _Callback?.Invoke(new() { ResultCode = 1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>MQTTServer已关闭!" }); return Task.CompletedTask; } /// /// 设置客户端连接成功后的处理程序 /// private Task ClientConnectedHandle(ClientConnectedEventArgs arg) { var clients = _MqttServer.GetClientsAsync().Result; _Callback?.Invoke(new() { ResultCode = 1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端'{arg.ClientId}'已成功连接!当前客户端连接数:{clients?.Count}个。" }); return Task.CompletedTask; } /// /// 设置客户端断开后的处理程序 /// private Task ClientDisconnectedHandle(ClientDisconnectedEventArgs arg) { var clients = _MqttServer.GetClientsAsync().Result; _Callback?.Invoke(new ResultData_MQTT() { ResultCode = 1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端'{arg.ClientId}'已断开连接!当前客户端连接数:{clients?.Count}个。" }); return Task.CompletedTask; } /// /// 设置消息订阅通知 /// private Task ClientSubscribedTopicHandle(ClientSubscribedTopicEventArgs arg) { //if (!arg.Equals("admin")) //{ // var client = clients.Where(a => a.Id == arg.ClientId).FirstOrDefault(); // client?.DisconnectAsync(); // return Task.CompletedTask; //} _Callback?.Invoke(new ResultData_MQTT() { ResultCode = 1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端'{arg.ClientId}'订阅了主题'{arg.TopicFilter.Topic}',主题服务质量:'{arg.TopicFilter.QualityOfServiceLevel}'!" }); return Task.CompletedTask; } /// /// 设置消息退订通知 /// private Task ClientUnsubscribedTopicHandle(ClientUnsubscribedTopicEventArgs arg) { _Callback?.Invoke(new ResultData_MQTT() { ResultCode = 1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端{arg.ClientId}退订了主题{arg.TopicFilter}!" }); return Task.CompletedTask; } /// /// 鉴权-未写完 /// /// private Task ValidatingConnectionHandle(ValidatingConnectionEventArgs arg) // 鉴权 { if (arg.UserName != "Admin" || arg.Password != "Admin123") { } return Task.CompletedTask; } /// /// 设置消息处理程序 /// private Task ApplicationMessageNotConsumedHandle(ApplicationMessageNotConsumedEventArgs arg) { _Callback?.Invoke(new ResultData_MQTT() { ResultCode = -1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端:'{arg.SenderId}'发布了消息:主题:'{arg.ApplicationMessage.Topic}'!内容:'{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}';服务质量:{arg.ApplicationMessage.QualityOfServiceLevel};保留:{arg.ApplicationMessage.Retain}" }); return Task.CompletedTask; } #endregion 处理事件 #endregion Server #region Client /// /// 客户端 /// IMqttClient _MqttClient = null; /// /// 创建MQTTClient并运行 /// /// MQTTClient连接配置 /// 信息处理逻辑 /// public async Task CreateMQTTClientAndStart(MqttClientOptionsBuilder mqttClientOptionsBuilder, Action? callback) { ResultData_MQTT resultData_MQTT = new ResultData_MQTT(); _Callback = callback; try { MqttClientOptions options = mqttClientOptionsBuilder.Build(); _MqttClient = new MqttFactory().CreateMqttClient(); _MqttClient.ConnectedAsync += ConnectedHandle; // 服务器连接事件 _MqttClient.DisconnectedAsync += DisconnectedHandle; // 服务器断开事件(可以写入重连事件) _MqttClient.ApplicationMessageReceivedAsync += ApplicationMessageReceivedHandle; // 发送消息事件 await _MqttClient.ConnectAsync(options); // 连接 if (_MqttClient.IsConnected) { resultData_MQTT = new ResultData_MQTT() { ResultCode = 1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTClient_成功!" }; } else { resultData_MQTT = new ResultData_MQTT() { ResultCode = -1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTClient_失败!" }; } } catch (Exception ex) { resultData_MQTT = new ResultData_MQTT() { ResultCode = -1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTClient_失败!错误信息:" + ex.Message }; } _Callback?.Invoke(resultData_MQTT); return resultData_MQTT; } /// /// 简易创建MQTTClient并运行 /// /// mqttServer的Url /// mqttServer的端口 /// 认证用用户名 /// 认证用密码 /// 信息处理逻辑 /// public async Task CreateMQTTClientAndStart(string mqttServerUrl, int port, string userName, string userPassword, Action? callback) { ResultData_MQTT resultData_MQTT = new ResultData_MQTT(); _Callback = callback; try { MqttClientOptionsBuilder mqttClientOptionsBuilder = new MqttClientOptionsBuilder(); mqttClientOptionsBuilder.WithTcpServer(mqttServerUrl, port); // 设置MQTT服务器地址 if (!string.IsNullOrEmpty(userName)) { mqttClientOptionsBuilder.WithCredentials(userName, userPassword); // 设置鉴权参数 } mqttClientOptionsBuilder.WithClientId(Guid.NewGuid().ToString("N")); // 设置客户端序列号 MqttClientOptions options = mqttClientOptionsBuilder.Build(); _MqttClient = new MqttFactory().CreateMqttClient(); _MqttClient.ConnectedAsync += ConnectedHandle; // 服务器连接事件 _MqttClient.DisconnectedAsync += DisconnectedHandle; // 服务器断开事件(可以写入重连事件) _MqttClient.ApplicationMessageReceivedAsync += ApplicationMessageReceivedHandle; // 发送消息事件 await _MqttClient.ConnectAsync(options); // 连接 if (_MqttClient.IsConnected) { resultData_MQTT = new ResultData_MQTT() { ResultCode = 1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTClient_成功!" }; } else { resultData_MQTT = new ResultData_MQTT() { ResultCode = -1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTClient_失败!" }; } } catch (Exception ex) { resultData_MQTT = new ResultData_MQTT() { ResultCode = -1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTClient_失败!错误信息:" + ex.Message }; } _Callback?.Invoke(resultData_MQTT); return resultData_MQTT; } /// /// 关闭MQTTClient /// public async Task DisconnectAsync_Client() { ResultData_MQTT resultData_MQTT = new ResultData_MQTT(); try { if (_MqttClient != null && _MqttClient.IsConnected) { await _MqttClient.DisconnectAsync(); _MqttClient.Dispose(); _MqttClient = null; resultData_MQTT = new ResultData_MQTT() { ResultCode = 1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTClient_成功!" }; } else { resultData_MQTT = new ResultData_MQTT() { ResultCode = -1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTClient_失败!MQTTClient未开启连接!" }; } } catch (Exception ex) { resultData_MQTT = new ResultData_MQTT() { ResultCode = -1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTClient_失败!错误信息:" + ex.Message }; } _Callback?.Invoke(resultData_MQTT); } /// /// 重连 /// /// public async Task ReconnectAsync_Client() { ResultData_MQTT resultData_MQTT = new ResultData_MQTT(); try { if (_MqttClient != null) { await _MqttClient.ReconnectAsync(); resultData_MQTT = new ResultData_MQTT() { ResultCode = 1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了MQTTClient重连_成功!" }; } else { resultData_MQTT = new ResultData_MQTT() { ResultCode = -1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了MQTTClient重连_失败!未设置MQTTClient连接!" }; } } catch (Exception ex) { resultData_MQTT = new ResultData_MQTT() { ResultCode = -1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了MQTTClient重连_失败!错误信息:" + ex.Message }; } _Callback?.Invoke(resultData_MQTT); } /// /// 订阅 /// /// 主题 public async void SubscribeAsync_Client(string topic) { ResultData_MQTT resultData_MQTT = new ResultData_MQTT(); try { MqttTopicFilter topicFilter = new MqttTopicFilterBuilder().WithTopic(topic).Build(); await _MqttClient.SubscribeAsync(topicFilter, CancellationToken.None); resultData_MQTT = new ResultData_MQTT() { ResultCode = 1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>MQTTClient执行了订阅'{topic}'_成功!" }; } catch (Exception ex) { resultData_MQTT = new ResultData_MQTT() { ResultCode = -1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>MQTTClient执行了订阅'{topic}'_失败!错误信息:" + ex.Message }; } _Callback?.Invoke(resultData_MQTT); } /// /// 退订阅 /// /// 主题 public async void UnsubscribeAsync_Client(string topic) { ResultData_MQTT resultData_MQTT = new ResultData_MQTT(); try { await MqttClientExtensions.UnsubscribeAsync(_MqttClient, topic, CancellationToken.None); resultData_MQTT = new ResultData_MQTT() { ResultCode = 1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>MQTTClient执行了退订'{topic}'_成功!" }; } catch (Exception ex) { resultData_MQTT = new ResultData_MQTT() { ResultCode = -1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>MQTTClient执行退订'{topic}'_失败!错误信息:" + ex.Message }; } _Callback?.Invoke(resultData_MQTT); } /// /// 发布消息( 必须在成功连接以后才生效 ) /// /// 主题 /// 信息 /// 是否保留 /// public async Task PublishAsync_Client(string topic, string msg, bool retained) { ResultData_MQTT resultData_MQTT = new ResultData_MQTT(); try { MqttApplicationMessageBuilder mqttApplicationMessageBuilder = new MqttApplicationMessageBuilder(); mqttApplicationMessageBuilder.WithTopic(topic); // 主题 mqttApplicationMessageBuilder.WithPayload(msg); // 信息 mqttApplicationMessageBuilder.WithRetainFlag(retained); // 保留 MqttApplicationMessage messageObj = mqttApplicationMessageBuilder.Build(); if (_MqttClient.IsConnected) { await _MqttClient.PublishAsync(messageObj, CancellationToken.None); resultData_MQTT = new ResultData_MQTT() { ResultCode = 1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>执行了发布信息_成功!主题:'{topic}',信息:'{msg}',是否保留:'{retained}'" }; } else { // 未连接 resultData_MQTT = new ResultData_MQTT() { ResultCode = -1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了发布信息_失败!MQTTClient未开启连接!" }; } } catch (Exception ex) { resultData_MQTT = new ResultData_MQTT() { ResultCode = -1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了发布信息_失败!错误信息:" + ex.Message }; } _Callback?.Invoke(resultData_MQTT); } #region 事件 /// /// 服务器连接事件 /// private Task ConnectedHandle(MqttClientConnectedEventArgs arg) { _Callback?.Invoke(new ResultData_MQTT() { ResultCode = 1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>已连接到MQTT服务器!" }); return Task.CompletedTask; } /// /// 服务器断开事件(可以写入重连事件) /// private Task DisconnectedHandle(MqttClientDisconnectedEventArgs arg) { _Callback?.Invoke(new ResultData_MQTT() { ResultCode = 1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>已断开与MQTT服务器连接!" }); return Task.CompletedTask; } /// /// 发送消息事件 /// private Task ApplicationMessageReceivedHandle(MqttApplicationMessageReceivedEventArgs arg) { _Callback?.Invoke(new ResultData_MQTT() { ResultCode = 1, ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>MQTTClient'{arg.ClientId}'内容:'{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}';主题:'{arg.ApplicationMessage.Topic}',消息等级Qos:[{arg.ApplicationMessage.QualityOfServiceLevel}],是否保留:[{arg.ApplicationMessage.Retain}]", ResultObject1 = arg.ApplicationMessage.Topic, ResultObject2 = Encoding.UTF8.GetString(arg.ApplicationMessage.Payload) }); return Task.CompletedTask; } #endregion 事件 #endregion Client } /// /// 信息载体 /// public class ResultData_MQTT { /// /// 结果Code /// 正常1,其他为异常;0不作为回复结果 /// public int ResultCode { get; set; } = 0; /// /// 结果信息 /// public string ResultMsg { get; set; } = string.Empty; /// /// 扩展1 /// public object? ResultObject1 { get; set; } = string.Empty; /// /// 扩展2 /// public object? ResultObject2 { get; set; } = string.Empty; } } 四、分布式MQTT消息服务器推荐

  EMQ X(简称 EMQ):是一款完全开源,高度可伸缩,高可用的分布式 MQTT 消息服务器,同时也支持 CoAP/LwM2M 一站式 IoT 协议接入。可处理千万级别的并发客户端。

 

参考:

  https://www.cnblogs.com/hayasi/p/7743356.html

  https://www.cnblogs.com/dathlin/p/11631894.html

  https://www.cnblogs.com/sxkgeek/p/9140180.html

  https://www.shangmayuan.com/a/67b2f4a9f2c440e9b2db3a97.html

  https://zhuanlan.zhihu.com/p/419561816

  https://blog.csdn.net/qq_37258787/article/details/80183923

  https://www.cnblogs.com/dathlin/p/11631894.html

  https://www.jianshu.com/p/a371c6ac076b

本文来自博客园,作者:꧁执笔小白꧂,转载请注明原文链接:https://www.cnblogs.com/qq2806933146xiaobai/articles/15850366.html



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有