From 3cd360e429de4f4f010797d33dadf39544b10090 Mon Sep 17 00:00:00 2001 From: guanshubiao Date: Fri, 18 Mar 2022 14:45:07 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dbug1-mqtt=E6=96=AD=E5=BC=80?= =?UTF-8?q?=E9=87=8D=E8=BF=9E=E6=97=B6=EF=BC=8Ctopic=E6=B2=A1=E6=9C=89?= =?UTF-8?q?=E8=AE=A2=E9=98=85=E3=80=82=20=E4=BF=AE=E5=A4=8Dbug2-=E4=B8=8A?= =?UTF-8?q?=E6=8A=A5property=E6=95=B0=E6=8D=AE=EF=BC=8C=E5=A6=82=E6=9E=9C?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E8=A7=A3=E6=9E=90=E5=BC=82=E5=B8=B8=EF=BC=8C?= =?UTF-8?q?=E5=AE=A2=E6=88=B7=E7=AB=AF=E5=B0=86=E6=96=AD=E5=BC=80=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/ruoyi/iot/mqtt/EmqxCallback.java | 31 ++++++++++++------- .../java/com/ruoyi/iot/mqtt/EmqxClient.java | 5 +++ .../java/com/ruoyi/iot/mqtt/EmqxService.java | 16 ++++++---- 3 files changed, 34 insertions(+), 18 deletions(-) diff --git a/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxCallback.java b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxCallback.java index 6d9abae5..f958999e 100644 --- a/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxCallback.java +++ b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxCallback.java @@ -12,7 +12,7 @@ import org.springframework.stereotype.Component; * @Description 消费监听类 */ @Component -public class EmqxCallback implements MqttCallback { +public class EmqxCallback implements MqttCallbackExtended { private static final Logger logger = LoggerFactory.getLogger(EmqxCallback.class); @Autowired @@ -24,17 +24,9 @@ public class EmqxCallback implements MqttCallback { @Override public void connectionLost(Throwable throwable) { try { - // 重连mqtt - while(true) { - logger.info("mqtt连接断开,重新连接中..."); - EmqxClient.client.reconnect(); - Thread.sleep(10000); - if(EmqxClient.client.isConnected()){ - logger.info("mqtt已经重新连接"); - break; - } - } - } catch (MqttException | InterruptedException e) { + logger.info("mqtt断开连接--"); + //EmqxService.client 添加配置 options.setAutomaticReconnect(true);会自动重连 + } catch (Exception e) { // e.printStackTrace(); logger.error("发生错误:"+e.getMessage()); } @@ -47,8 +39,23 @@ public class EmqxCallback implements MqttCallback { emqxService.subscribeCallback(topic,mqttMessage); } + /** + * 发布消息后,到达MQTT服务器,服务器回调消息接收 + * @param iMqttDeliveryToken + */ @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { // 消息到达 MQTT 代理时触发的事件 } + + /** + * 监听mqtt连接消息 + * @param reconnect + * @param serverURI + */ + @Override + public void connectComplete(boolean reconnect, String serverURI) { + logger.info("mqtt已经重新连接!!"); + //连接后,可以在此做初始化事件,或订阅 + } } diff --git a/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxClient.java b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxClient.java index 56a4afb5..9aad218e 100644 --- a/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxClient.java +++ b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxClient.java @@ -43,6 +43,11 @@ public class EmqxClient { options.setPassword(password.toCharArray()); options.setConnectionTimeout(timeout); options.setKeepAliveInterval(keepalive); + //设置自动重新连接 + options.setAutomaticReconnect(true); + /*设置为false,断开连接后,不清除session,重连后仍然是之前的session, + 保留订阅的主题,能接受到离线期间的消息*/ + options.setCleanSession(false); EmqxClient.client=client; client.setCallback(emqxCallback); clientConnect(options,client); diff --git a/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxService.java b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxService.java index 9c2b5ba0..42d8cd78 100644 --- a/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxService.java +++ b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxService.java @@ -118,12 +118,16 @@ public class EmqxService { * @param message */ private void reportProperty(Long productId,String deviceNum,String message,boolean isShadow){ - List thingsModelValueRemarkItems=JSON.parseArray(message, ThingsModelValueRemarkItem.class); - ThingsModelValuesInput input=new ThingsModelValuesInput(); - input.setProductId(productId); - input.setDeviceNumber(deviceNum); - input.setThingsModelValueRemarkItem(thingsModelValueRemarkItems); - deviceService.reportDeviceThingsModelValue(input,1,isShadow); + try { + List thingsModelValueRemarkItems=JSON.parseArray(message, ThingsModelValueRemarkItem.class); + ThingsModelValuesInput input=new ThingsModelValuesInput(); + input.setProductId(productId); + input.setDeviceNumber(deviceNum); + input.setThingsModelValueRemarkItem(thingsModelValueRemarkItems); + deviceService.reportDeviceThingsModelValue(input,1,isShadow); + }catch (Exception e){ + logger.error("接收属性数据,解析数据时异常 message={}",e.getMessage()); + } } /**