修复bug1-mqtt断开重连时,topic没有订阅。

修复bug2-上报property数据,如果数据解析异常,客户端将断开。
This commit is contained in:
guanshubiao
2022-03-18 14:45:07 +08:00
parent 9b2c23f51a
commit 3cd360e429
3 changed files with 34 additions and 18 deletions

View File

@@ -12,7 +12,7 @@ import org.springframework.stereotype.Component;
* @Description 消费监听类
*/
@Component
public class EmqxCallback implements MqttCallback {
public class EmqxCallback implements MqttCallbackExtended {
private static final Logger logger = LoggerFactory.getLogger(EmqxCallback.class);
@Autowired
@@ -24,17 +24,9 @@ public class EmqxCallback implements MqttCallback {
@Override
public void connectionLost(Throwable throwable) {
try {
// 重连mqtt
while(true) {
logger.info("mqtt连接断开重新连接中...");
EmqxClient.client.reconnect();
Thread.sleep(10000);
if(EmqxClient.client.isConnected()){
logger.info("mqtt已经重新连接");
break;
}
}
} catch (MqttException | InterruptedException e) {
logger.info("mqtt断开连接--");
//EmqxService.client 添加配置 options.setAutomaticReconnect(true);会自动重连
} catch (Exception e) {
// e.printStackTrace();
logger.error("发生错误:"+e.getMessage());
}
@@ -47,8 +39,23 @@ public class EmqxCallback implements MqttCallback {
emqxService.subscribeCallback(topic,mqttMessage);
}
/**
* 发布消息后到达MQTT服务器服务器回调消息接收
* @param iMqttDeliveryToken
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
// 消息到达 MQTT 代理时触发的事件
}
/**
* 监听mqtt连接消息
* @param reconnect
* @param serverURI
*/
@Override
public void connectComplete(boolean reconnect, String serverURI) {
logger.info("mqtt已经重新连接");
//连接后,可以在此做初始化事件,或订阅
}
}

View File

@@ -43,6 +43,11 @@ public class EmqxClient {
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
//设置自动重新连接
options.setAutomaticReconnect(true);
/*设置为false断开连接后不清除session重连后仍然是之前的session
保留订阅的主题,能接受到离线期间的消息*/
options.setCleanSession(false);
EmqxClient.client=client;
client.setCallback(emqxCallback);
clientConnect(options,client);

View File

@@ -118,12 +118,16 @@ public class EmqxService {
* @param message
*/
private void reportProperty(Long productId,String deviceNum,String message,boolean isShadow){
try {
List<ThingsModelValueRemarkItem> thingsModelValueRemarkItems=JSON.parseArray(message, ThingsModelValueRemarkItem.class);
ThingsModelValuesInput input=new ThingsModelValuesInput();
input.setProductId(productId);
input.setDeviceNumber(deviceNum);
input.setThingsModelValueRemarkItem(thingsModelValueRemarkItems);
deviceService.reportDeviceThingsModelValue(input,1,isShadow);
}catch (Exception e){
logger.error("接收属性数据,解析数据时异常 message={}",e.getMessage());
}
}
/**