mirror of
https://gitee.com/beecue/fastbee.git
synced 2025-12-17 16:36:03 +08:00
@@ -12,7 +12,7 @@ import org.springframework.stereotype.Component;
|
||||
* @Description 消费监听类
|
||||
*/
|
||||
@Component
|
||||
public class EmqxCallback implements MqttCallback {
|
||||
public class EmqxCallback implements MqttCallbackExtended {
|
||||
private static final Logger logger = LoggerFactory.getLogger(EmqxCallback.class);
|
||||
|
||||
@Autowired
|
||||
@@ -24,17 +24,9 @@ public class EmqxCallback implements MqttCallback {
|
||||
@Override
|
||||
public void connectionLost(Throwable throwable) {
|
||||
try {
|
||||
// 重连mqtt
|
||||
while(true) {
|
||||
logger.info("mqtt连接断开,重新连接中...");
|
||||
EmqxClient.client.reconnect();
|
||||
Thread.sleep(10000);
|
||||
if(EmqxClient.client.isConnected()){
|
||||
logger.info("mqtt已经重新连接");
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (MqttException | InterruptedException e) {
|
||||
logger.info("mqtt断开连接--");
|
||||
//EmqxService.client 添加配置 options.setAutomaticReconnect(true);会自动重连
|
||||
} catch (Exception e) {
|
||||
// e.printStackTrace();
|
||||
logger.error("发生错误:"+e.getMessage());
|
||||
}
|
||||
@@ -47,8 +39,23 @@ public class EmqxCallback implements MqttCallback {
|
||||
emqxService.subscribeCallback(topic,mqttMessage);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布消息后,到达MQTT服务器,服务器回调消息接收
|
||||
* @param iMqttDeliveryToken
|
||||
*/
|
||||
@Override
|
||||
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
|
||||
// 消息到达 MQTT 代理时触发的事件
|
||||
}
|
||||
|
||||
/**
|
||||
* 监听mqtt连接消息
|
||||
* @param reconnect
|
||||
* @param serverURI
|
||||
*/
|
||||
@Override
|
||||
public void connectComplete(boolean reconnect, String serverURI) {
|
||||
logger.info("mqtt已经重新连接!!");
|
||||
//连接后,可以在此做初始化事件,或订阅
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,6 +43,11 @@ public class EmqxClient {
|
||||
options.setPassword(password.toCharArray());
|
||||
options.setConnectionTimeout(timeout);
|
||||
options.setKeepAliveInterval(keepalive);
|
||||
//设置自动重新连接
|
||||
options.setAutomaticReconnect(true);
|
||||
/*设置为false,断开连接后,不清除session,重连后仍然是之前的session,
|
||||
保留订阅的主题,能接受到离线期间的消息*/
|
||||
options.setCleanSession(false);
|
||||
EmqxClient.client=client;
|
||||
client.setCallback(emqxCallback);
|
||||
clientConnect(options,client);
|
||||
|
||||
@@ -118,12 +118,16 @@ public class EmqxService {
|
||||
* @param message
|
||||
*/
|
||||
private void reportProperty(Long productId,String deviceNum,String message,boolean isShadow){
|
||||
try {
|
||||
List<ThingsModelValueRemarkItem> thingsModelValueRemarkItems=JSON.parseArray(message, ThingsModelValueRemarkItem.class);
|
||||
ThingsModelValuesInput input=new ThingsModelValuesInput();
|
||||
input.setProductId(productId);
|
||||
input.setDeviceNumber(deviceNum);
|
||||
input.setThingsModelValueRemarkItem(thingsModelValueRemarkItems);
|
||||
deviceService.reportDeviceThingsModelValue(input,1,isShadow);
|
||||
}catch (Exception e){
|
||||
logger.error("接收属性数据,解析数据时异常 message={}",e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user