java mqtt 断线重连,mqtt+java+spring(含断线重连) 您所在的位置:网站首页 mqtt重连机制 java mqtt 断线重连,mqtt+java+spring(含断线重连)

java mqtt 断线重连,mqtt+java+spring(含断线重连)

2023-03-16 06:43| 来源: 网络整理| 查看: 265

一.pom.xml中添加mqtt的依赖:

org.springframework.boot

spring-boot-starter-integration

org.springframework.integration

spring-integration-stream

org.springframework.integration

spring-integration-mqtt

二.application.properties配置文件中添加配置(其中MQTT的服务端使用的是ActiveMQ):

## MQTT##

mqtt.host=tcp://127.0.0.1:1883

mqtt.clientId=mqttClient

mqtt.username=admin

mqtt.password=admin

mqtt.topicName1=topic1

mqtt.topicName2=topic2

mqtt.timeout=1000

mqtt.keepalive=2000

三.编码自己的MQTT客户端:

package mqtt2;

import org.eclipse.paho.client.mqttv3.*;

import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

/**

* description:

* author: yangzihe

* date: 2018-12-17 17:33

**/

public class MyMQTTClient {

private static final Logger LOGGER = LoggerFactory.getLogger(MyMQTTClient.class);

private static MqttClient client;

public static MqttClient getClient() {

return client;

}

public static void setClient(MqttClient client) {

MyMQTTClient.client = client;

}

private String host;

private String username;

private String password;

private String clientId;

private int timeout;

private int keepalive;

public MyMQTTClient(String host, String username, String password, String clientId, int timeOut, int keepAlive) {

this.host = host;

this.username = username;

this.password = password;

this.clientId = clientId;

this.timeout = timeOut;

this.keepalive = keepAlive;

}

/**

* 设置mqtt连接参数

*

* @param username

* @param password

* @param timeout

* @param keepalive

* @return

*/

public MqttConnectOptions setMqttConnectOptions(String username, String password, int timeout, int keepalive) {

MqttConnectOptions options = new MqttConnectOptions();

options.setUserName(username);

options.setPassword(password.toCharArray());

options.setConnectionTimeout(timeout);

options.setKeepAliveInterval(keepalive);

options.setCleanSession(false);

return options;

}

/**

* 连接mqtt服务端,得到MqttClient连接对象

*/

public void connect() throws MqttException {

if (client == null) {

client = new MqttClient(host, clientId, new MemoryPersistence());

client.setCallback(new MyMQTTCallback(MyMQTTClient.this));

}

MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(username, password, timeout, keepalive);

if (!client.isConnected()) {

client.connect(mqttConnectOptions);

} else {

client.disconnect();

client.connect(mqttConnectOptions);

}

LOGGER.info("MQTT connect success");//未发生异常,则连接成功

}

/**

* 发布,默认qos为0,非持久化

*

* @param pushMessage

* @param topic

*/

public void publish(String pushMessage, String topic) {

publish(pushMessage, topic, 0, false);

}

/**

* 发布消息

*

* @param pushMessage

* @param topic

* @param qos

* @param retained:留存

*/

public void publish(String pushMessage, String topic, int qos, boolean retained) {

MqttMessage message = new MqttMessage();

message.setPayload(pushMessage.getBytes());

message.setQos(qos);

message.setRetained(retained);

MqttTopic mqttTopic = MyMQTTClient.getClient().getTopic(topic);

if (null == mqttTopic) {

LOGGER.error("topic is not exist");

}

MqttDeliveryToken token;//Delivery:配送

synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充

try {

token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件

token.waitForCompletion(1000L);

} catch (MqttPersistenceException e) {

e.printStackTrace();

} catch (MqttException e) {

e.printStackTrace();

}

}

}

/**

* 订阅某个主题,qos默认为0

*

* @param topic

*/

public void subscribe(String topic) {

subscribe(topic, 0);

}

/**

* 订阅某个主题

*

* @param topic

* @param qos

*/

public void subscribe(String topic, int qos) {

try {

MyMQTTClient.getClient().subscribe(topic, qos);

} catch (MqttException e) {

e.printStackTrace();

}

}

}

四.编码自己的MQTT回调对象:

package mqtt2;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;

import org.eclipse.paho.client.mqttv3.MqttCallback;

import org.eclipse.paho.client.mqttv3.MqttException;

import org.eclipse.paho.client.mqttv3.MqttMessage;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

/**

* description:

* author: yangzihe

* date: 2018-12-17 17:37

**/

public class MyMQTTCallback implements MqttCallback {

private static final Logger LOGGER = LoggerFactory.getLogger(MyMQTTCallback.class);

private MyMQTTClient myMQTTClient;

public MyMQTTCallback(MyMQTTClient myMQTTClient) {

this.myMQTTClient = myMQTTClient;

}

/**

* 丢失连接,可在这里做重连

* 只会调用一次

*

* @param throwable

*/

@Override

public void connectionLost(Throwable throwable) {

LOGGER.error("连接断开,下面做重连...");

long reconnectTimes = 1;

while (true) {

try {

if (MyMQTTClient.getClient().isConnected()) {

LOGGER.warn("mqtt reconnect success end");

return;

}

LOGGER.warn("mqtt reconnect times = {} try again...", reconnectTimes++);

MyMQTTClient.getClient().reconnect();

} catch (MqttException e) {

LOGGER.error("", e);

}

try {

Thread.sleep(1000);

} catch (InterruptedException e1) {

//e1.printStackTrace();

}

}

}

/**

* 消息到达后

* subscribe后,执行的回调函数

*

* @param s

* @param mqttMessage

* @throws Exception

*/

@Override

public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {

// subscribe后得到的消息会执行到这里面

LOGGER.info("接收消息主题 : {},接收消息内容 : {}", s, new String(mqttMessage.getPayload()));

}

/**

* publish后,配送完成后回调的方法

*

* @param iMqttDeliveryToken

*/

@Override

public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

// LOGGER.info("==========deliveryComplete={}==========", iMqttDeliveryToken.isComplete());

}

}

五.编码MQTT配置类,将自己的MQTTClient客户端对象注入到spring容器中:

package mqtt2;

import lombok.extern.slf4j.Slf4j;

import org.eclipse.paho.client.mqttv3.MqttException;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**

* description:

* author: yangzihe

* date: 2018-12-17 17:36

**/

@Configuration

@Slf4j

public class MqttConfiguration {

@Value("${mqtt.host}")

String host;

@Value("${mqtt.username}")

String username;

@Value("${mqtt.password}")

String password;

@Value("${mqtt.clientId}")

String clientId;

@Value("${mqtt.timeout}")

int timeOut;

@Value("${mqtt.keepalive}")

int keepAlive;

@Bean//注入spring

public MyMQTTClient myMQTTClient() {

MyMQTTClient myMQTTClient = new MyMQTTClient(host, clientId, username, password, timeOut, keepAlive);

for (int i = 0; i < 10; i++) {

try {

myMQTTClient.connect();

return myMQTTClient;

} catch (MqttException e) {

log.error("MQTT connect exception,connect time = " + i);

try {

Thread.sleep(2000);

} catch (InterruptedException e1) {

e1.printStackTrace();

}

//e.printStackTrace();

}

}

return myMQTTClient;

}

}

六.编写测试类,测试发布以及订阅:

package demo;

import mqtt2.MyMQTTClient;

import mqtt2.SpringBootApplicationMQTT2;

import org.junit.Test;

import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import org.springframework.test.context.web.WebAppConfiguration;

/**

* description:

* author: yangzihe

* date: 2018-12-17 17:37

**/

@SpringBootTest(classes = SpringBootApplicationMQTT2.class)

@RunWith(SpringJUnit4ClassRunner.class)

@WebAppConfiguration

public class MQTTTest2 {

@Autowired

private MyMQTTClient myMQTTClient;

@Test

public void testProduce() throws Exception {

long l = System.nanoTime();

for (int i = 0; i < 20000; i++) {

String topicName = "topic11";

//myMQTTClient.subscribe(topicName);

myMQTTClient.publish(topicName + "发送消息" + i, topicName);

}

long l1 = System.nanoTime();

double d = ((double) (l1 - l)) / 1000000000;

long l2 = System.nanoTime();

for (int i = 0; i < 20000; i++) {

String topicName = "topic22";

//myMQTTClient.subscribe(topicName);

myMQTTClient.publish(topicName + "发送消息" + i, topicName);

}

double d2 = ((double) (System.nanoTime() - l2)) / 1000000000;

System.err.println("=====================第一个topic发送2万数据花费时间:=================" + d + "秒");//2秒多

System.err.println("=====================第二个topic发送2万数据花费时间:=================" + d2 + "秒");

}

/**

* 分级发布与订阅

* @throws Exception

*/

@Test

public void testTopic() throws Exception {

int count = 10;

String topicName1 = "topic33/type/name";

String topicName2 = "topic33/type";

String topicName3 = "topic33";

myMQTTClient.subscribe(topicName3+"/#");

for (int i = 0; i < count; i++) {

myMQTTClient.publish(topicName1 + "发送消息" + i, topicName1);

}

for (int i = 0; i < count; i++) {

myMQTTClient.publish(topicName2 + "发送消息" + i, topicName2);

}

for (int i = 0; i < count; i++) {

myMQTTClient.publish(topicName3 + "发送消息" + i, topicName3);

}

}

}

其中SpringBootApplicationMQTT2是springboot的启动类:

package mqtt2;

import org.springframework.boot.SpringApplication;

/**

* ClassName: SpringBootApplication

* description:

* author: yangzihe

* date: 2018-09-30 09:15

**/

@org.springframework.boot.autoconfigure.SpringBootApplication//@EnableAutoConfiguration @ComponentScan

public class SpringBootApplicationMQTT2 {

public static void main(String[] args) {

SpringApplication.run(SpringBootApplicationMQTT2.class, args);

}

}

运行结果:

4722548781f5f8268584139eedb35b67.png

参考博客:https://blog.csdn.net/hao114500043/article/details/81742849

补充:为什么publish时一定要同步?

synchronized (this) {

try {

token = mqttTopic.publish(message);

token.waitForCompletion(1000L);

} catch (MqttPersistenceException e) {

e.printStackTrace();

} catch (MqttException e) {

e.printStackTrace();

}

}

查看mqttTopic.publish(message)源码:

public MqttDeliveryToken publish(MqttMessage message) throws MqttException, MqttPersistenceException {

MqttDeliveryToken token = new MqttDeliveryToken(this.comms.getClient().getClientId());

token.setMessage(message);

this.comms.sendNoWait(this.createPublish(message), token);

token.internalTok.waitUntilSent();

return token;

}

每次publish都是new MqttDeliveryToken,然后查看token.internalTok.waitUntilSent()源码:

public void waitUntilSent() throws MqttException {

Object var1 = this.sentLock;

synchronized(this.sentLock) {

Object var2 = this.responseLock;

synchronized(this.responseLock) {

if (this.exception != null) {

throw this.exception;

}

}

while(!this.sent) {

try {

log.fine(CLASS_NAME, "waitUntilSent", "409", new Object[]{this.getKey()});

//消息未发送到队列中,sentLock就wait,线程进入等待,同时释放了锁sentLock

this.sentLock.wait();

} catch (InterruptedException var3) {

;

}

}

if (!this.sent) {

if (this.exception == null) {

throw ExceptionHelper.createMqttException(6);

} else {

throw this.exception;

}

}

}

}

this.sentLock.wait()在当前线程还未将消息发送到队列中去时,当前线程进入等待状态,在有其他线程唤醒时,代码继续往下执行。一直到消息发送到队列中标识sent为true时,线程结束循环,结束函数。若没有其他线程唤醒或者消息发送标识sent=false,线程则一直等待下去。

因为是new MqttDeliveryToken,Token也是new的,所以sent不会受多线程影响(每条消息有着自己的sent)。

所以猜测是没有线程唤醒本线程,那么什么时候才会有线程唤醒本线程?

猜测:        this.comms.sendNoWait(this.createPublish(message), token);    中有taskExecutor去执行“将消息发送到队列中”task,task执行完成后,会将sent=true,然后notifyAll()。然而,在多线程的时候,this.comms.sendNoWait(this.createPublish(message), token)的this是同一个MqttTopic时(topic名相同时),线程1等待时被线程2抢到cpu执行权,线程2会将taskExecutor执行的task更新为线程2的task,线程1再执行时没有了task,不会notifyAll(),就会一直等待下去了。



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有