java mqtt 断线重连,mqtt+java+spring(含断线重连) | 您所在的位置:网站首页 › mqtt重连机制 › java mqtt 断线重连,mqtt+java+spring(含断线重连) |
一.pom.xml中添加mqtt的依赖: org.springframework.boot spring-boot-starter-integration org.springframework.integration spring-integration-stream org.springframework.integration spring-integration-mqtt 二.application.properties配置文件中添加配置(其中MQTT的服务端使用的是ActiveMQ): ## MQTT## mqtt.host=tcp://127.0.0.1:1883 mqtt.clientId=mqttClient mqtt.username=admin mqtt.password=admin mqtt.topicName1=topic1 mqtt.topicName2=topic2 mqtt.timeout=1000 mqtt.keepalive=2000 三.编码自己的MQTT客户端: package mqtt2; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * description: * author: yangzihe * date: 2018-12-17 17:33 **/ public class MyMQTTClient { private static final Logger LOGGER = LoggerFactory.getLogger(MyMQTTClient.class); private static MqttClient client; public static MqttClient getClient() { return client; } public static void setClient(MqttClient client) { MyMQTTClient.client = client; } private String host; private String username; private String password; private String clientId; private int timeout; private int keepalive; public MyMQTTClient(String host, String username, String password, String clientId, int timeOut, int keepAlive) { this.host = host; this.username = username; this.password = password; this.clientId = clientId; this.timeout = timeOut; this.keepalive = keepAlive; } /** * 设置mqtt连接参数 * * @param username * @param password * @param timeout * @param keepalive * @return */ public MqttConnectOptions setMqttConnectOptions(String username, String password, int timeout, int keepalive) { MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(username); options.setPassword(password.toCharArray()); options.setConnectionTimeout(timeout); options.setKeepAliveInterval(keepalive); options.setCleanSession(false); return options; } /** * 连接mqtt服务端,得到MqttClient连接对象 */ public void connect() throws MqttException { if (client == null) { client = new MqttClient(host, clientId, new MemoryPersistence()); client.setCallback(new MyMQTTCallback(MyMQTTClient.this)); } MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(username, password, timeout, keepalive); if (!client.isConnected()) { client.connect(mqttConnectOptions); } else { client.disconnect(); client.connect(mqttConnectOptions); } LOGGER.info("MQTT connect success");//未发生异常,则连接成功 } /** * 发布,默认qos为0,非持久化 * * @param pushMessage * @param topic */ public void publish(String pushMessage, String topic) { publish(pushMessage, topic, 0, false); } /** * 发布消息 * * @param pushMessage * @param topic * @param qos * @param retained:留存 */ public void publish(String pushMessage, String topic, int qos, boolean retained) { MqttMessage message = new MqttMessage(); message.setPayload(pushMessage.getBytes()); message.setQos(qos); message.setRetained(retained); MqttTopic mqttTopic = MyMQTTClient.getClient().getTopic(topic); if (null == mqttTopic) { LOGGER.error("topic is not exist"); } MqttDeliveryToken token;//Delivery:配送 synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充 try { token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件 token.waitForCompletion(1000L); } catch (MqttPersistenceException e) { e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); } } } /** * 订阅某个主题,qos默认为0 * * @param topic */ public void subscribe(String topic) { subscribe(topic, 0); } /** * 订阅某个主题 * * @param topic * @param qos */ public void subscribe(String topic, int qos) { try { MyMQTTClient.getClient().subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } } 四.编码自己的MQTT回调对象: package mqtt2; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * description: * author: yangzihe * date: 2018-12-17 17:37 **/ public class MyMQTTCallback implements MqttCallback { private static final Logger LOGGER = LoggerFactory.getLogger(MyMQTTCallback.class); private MyMQTTClient myMQTTClient; public MyMQTTCallback(MyMQTTClient myMQTTClient) { this.myMQTTClient = myMQTTClient; } /** * 丢失连接,可在这里做重连 * 只会调用一次 * * @param throwable */ @Override public void connectionLost(Throwable throwable) { LOGGER.error("连接断开,下面做重连..."); long reconnectTimes = 1; while (true) { try { if (MyMQTTClient.getClient().isConnected()) { LOGGER.warn("mqtt reconnect success end"); return; } LOGGER.warn("mqtt reconnect times = {} try again...", reconnectTimes++); MyMQTTClient.getClient().reconnect(); } catch (MqttException e) { LOGGER.error("", e); } try { Thread.sleep(1000); } catch (InterruptedException e1) { //e1.printStackTrace(); } } } /** * 消息到达后 * subscribe后,执行的回调函数 * * @param s * @param mqttMessage * @throws Exception */ @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { // subscribe后得到的消息会执行到这里面 LOGGER.info("接收消息主题 : {},接收消息内容 : {}", s, new String(mqttMessage.getPayload())); } /** * publish后,配送完成后回调的方法 * * @param iMqttDeliveryToken */ @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { // LOGGER.info("==========deliveryComplete={}==========", iMqttDeliveryToken.isComplete()); } } 五.编码MQTT配置类,将自己的MQTTClient客户端对象注入到spring容器中: package mqtt2; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttException; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * description: * author: yangzihe * date: 2018-12-17 17:36 **/ @Configuration @Slf4j public class MqttConfiguration { @Value("${mqtt.host}") String host; @Value("${mqtt.username}") String username; @Value("${mqtt.password}") String password; @Value("${mqtt.clientId}") String clientId; @Value("${mqtt.timeout}") int timeOut; @Value("${mqtt.keepalive}") int keepAlive; @Bean//注入spring public MyMQTTClient myMQTTClient() { MyMQTTClient myMQTTClient = new MyMQTTClient(host, clientId, username, password, timeOut, keepAlive); for (int i = 0; i < 10; i++) { try { myMQTTClient.connect(); return myMQTTClient; } catch (MqttException e) { log.error("MQTT connect exception,connect time = " + i); try { Thread.sleep(2000); } catch (InterruptedException e1) { e1.printStackTrace(); } //e.printStackTrace(); } } return myMQTTClient; } } 六.编写测试类,测试发布以及订阅: package demo; import mqtt2.MyMQTTClient; import mqtt2.SpringBootApplicationMQTT2; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import org.springframework.test.context.web.WebAppConfiguration; /** * description: * author: yangzihe * date: 2018-12-17 17:37 **/ @SpringBootTest(classes = SpringBootApplicationMQTT2.class) @RunWith(SpringJUnit4ClassRunner.class) @WebAppConfiguration public class MQTTTest2 { @Autowired private MyMQTTClient myMQTTClient; @Test public void testProduce() throws Exception { long l = System.nanoTime(); for (int i = 0; i < 20000; i++) { String topicName = "topic11"; //myMQTTClient.subscribe(topicName); myMQTTClient.publish(topicName + "发送消息" + i, topicName); } long l1 = System.nanoTime(); double d = ((double) (l1 - l)) / 1000000000; long l2 = System.nanoTime(); for (int i = 0; i < 20000; i++) { String topicName = "topic22"; //myMQTTClient.subscribe(topicName); myMQTTClient.publish(topicName + "发送消息" + i, topicName); } double d2 = ((double) (System.nanoTime() - l2)) / 1000000000; System.err.println("=====================第一个topic发送2万数据花费时间:=================" + d + "秒");//2秒多 System.err.println("=====================第二个topic发送2万数据花费时间:=================" + d2 + "秒"); } /** * 分级发布与订阅 * @throws Exception */ @Test public void testTopic() throws Exception { int count = 10; String topicName1 = "topic33/type/name"; String topicName2 = "topic33/type"; String topicName3 = "topic33"; myMQTTClient.subscribe(topicName3+"/#"); for (int i = 0; i < count; i++) { myMQTTClient.publish(topicName1 + "发送消息" + i, topicName1); } for (int i = 0; i < count; i++) { myMQTTClient.publish(topicName2 + "发送消息" + i, topicName2); } for (int i = 0; i < count; i++) { myMQTTClient.publish(topicName3 + "发送消息" + i, topicName3); } } } 其中SpringBootApplicationMQTT2是springboot的启动类: package mqtt2; import org.springframework.boot.SpringApplication; /** * ClassName: SpringBootApplication * description: * author: yangzihe * date: 2018-09-30 09:15 **/ @org.springframework.boot.autoconfigure.SpringBootApplication//@EnableAutoConfiguration @ComponentScan public class SpringBootApplicationMQTT2 { public static void main(String[] args) { SpringApplication.run(SpringBootApplicationMQTT2.class, args); } } 运行结果: 参考博客:https://blog.csdn.net/hao114500043/article/details/81742849 补充:为什么publish时一定要同步? synchronized (this) { try { token = mqttTopic.publish(message); token.waitForCompletion(1000L); } catch (MqttPersistenceException e) { e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); } } 查看mqttTopic.publish(message)源码: public MqttDeliveryToken publish(MqttMessage message) throws MqttException, MqttPersistenceException { MqttDeliveryToken token = new MqttDeliveryToken(this.comms.getClient().getClientId()); token.setMessage(message); this.comms.sendNoWait(this.createPublish(message), token); token.internalTok.waitUntilSent(); return token; } 每次publish都是new MqttDeliveryToken,然后查看token.internalTok.waitUntilSent()源码: public void waitUntilSent() throws MqttException { Object var1 = this.sentLock; synchronized(this.sentLock) { Object var2 = this.responseLock; synchronized(this.responseLock) { if (this.exception != null) { throw this.exception; } } while(!this.sent) { try { log.fine(CLASS_NAME, "waitUntilSent", "409", new Object[]{this.getKey()}); //消息未发送到队列中,sentLock就wait,线程进入等待,同时释放了锁sentLock this.sentLock.wait(); } catch (InterruptedException var3) { ; } } if (!this.sent) { if (this.exception == null) { throw ExceptionHelper.createMqttException(6); } else { throw this.exception; } } } } this.sentLock.wait()在当前线程还未将消息发送到队列中去时,当前线程进入等待状态,在有其他线程唤醒时,代码继续往下执行。一直到消息发送到队列中标识sent为true时,线程结束循环,结束函数。若没有其他线程唤醒或者消息发送标识sent=false,线程则一直等待下去。 因为是new MqttDeliveryToken,Token也是new的,所以sent不会受多线程影响(每条消息有着自己的sent)。 所以猜测是没有线程唤醒本线程,那么什么时候才会有线程唤醒本线程? 猜测: this.comms.sendNoWait(this.createPublish(message), token); 中有taskExecutor去执行“将消息发送到队列中”task,task执行完成后,会将sent=true,然后notifyAll()。然而,在多线程的时候,this.comms.sendNoWait(this.createPublish(message), token)的this是同一个MqttTopic时(topic名相同时),线程1等待时被线程2抢到cpu执行权,线程2会将taskExecutor执行的task更新为线程2的task,线程1再执行时没有了task,不会notifyAll(),就会一直等待下去了。 |
CopyRight 2018-2019 实验室设备网 版权所有 |