diff --git a/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxCallback.java b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxCallback.java index 6d9abae5..f958999e 100644 --- a/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxCallback.java +++ b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxCallback.java @@ -12,7 +12,7 @@ import org.springframework.stereotype.Component; * @Description 消费监听类 */ @Component -public class EmqxCallback implements MqttCallback { +public class EmqxCallback implements MqttCallbackExtended { private static final Logger logger = LoggerFactory.getLogger(EmqxCallback.class); @Autowired @@ -24,17 +24,9 @@ public class EmqxCallback implements MqttCallback { @Override public void connectionLost(Throwable throwable) { try { - // 重连mqtt - while(true) { - logger.info("mqtt连接断开,重新连接中..."); - EmqxClient.client.reconnect(); - Thread.sleep(10000); - if(EmqxClient.client.isConnected()){ - logger.info("mqtt已经重新连接"); - break; - } - } - } catch (MqttException | InterruptedException e) { + logger.info("mqtt断开连接--"); + //EmqxService.client 添加配置 options.setAutomaticReconnect(true);会自动重连 + } catch (Exception e) { // e.printStackTrace(); logger.error("发生错误:"+e.getMessage()); } @@ -47,8 +39,23 @@ public class EmqxCallback implements MqttCallback { emqxService.subscribeCallback(topic,mqttMessage); } + /** + * 发布消息后,到达MQTT服务器,服务器回调消息接收 + * @param iMqttDeliveryToken + */ @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { // 消息到达 MQTT 代理时触发的事件 } + + /** + * 监听mqtt连接消息 + * @param reconnect + * @param serverURI + */ + @Override + public void connectComplete(boolean reconnect, String serverURI) { + logger.info("mqtt已经重新连接!!"); + //连接后,可以在此做初始化事件,或订阅 + } } diff --git a/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxClient.java b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxClient.java index 56a4afb5..9aad218e 100644 --- a/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxClient.java +++ b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxClient.java @@ -43,6 +43,11 @@ public class EmqxClient { options.setPassword(password.toCharArray()); options.setConnectionTimeout(timeout); options.setKeepAliveInterval(keepalive); + //设置自动重新连接 + options.setAutomaticReconnect(true); + /*设置为false,断开连接后,不清除session,重连后仍然是之前的session, + 保留订阅的主题,能接受到离线期间的消息*/ + options.setCleanSession(false); EmqxClient.client=client; client.setCallback(emqxCallback); clientConnect(options,client); diff --git a/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxService.java b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxService.java index 9c2b5ba0..42d8cd78 100644 --- a/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxService.java +++ b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxService.java @@ -118,12 +118,16 @@ public class EmqxService { * @param message */ private void reportProperty(Long productId,String deviceNum,String message,boolean isShadow){ - List thingsModelValueRemarkItems=JSON.parseArray(message, ThingsModelValueRemarkItem.class); - ThingsModelValuesInput input=new ThingsModelValuesInput(); - input.setProductId(productId); - input.setDeviceNumber(deviceNum); - input.setThingsModelValueRemarkItem(thingsModelValueRemarkItems); - deviceService.reportDeviceThingsModelValue(input,1,isShadow); + try { + List thingsModelValueRemarkItems=JSON.parseArray(message, ThingsModelValueRemarkItem.class); + ThingsModelValuesInput input=new ThingsModelValuesInput(); + input.setProductId(productId); + input.setDeviceNumber(deviceNum); + input.setThingsModelValueRemarkItem(thingsModelValueRemarkItems); + deviceService.reportDeviceThingsModelValue(input,1,isShadow); + }catch (Exception e){ + logger.error("接收属性数据,解析数据时异常 message={}",e.getMessage()); + } } /**