增加定时更新设备状态任务

This commit is contained in:
kerwincui
2025-03-26 17:50:29 +08:00
parent 38e236a8ff
commit 18568aa663
8 changed files with 129 additions and 8 deletions

View File

@@ -6,6 +6,7 @@ import com.fastbee.common.core.mq.message.DeviceDownMessage;
import com.fastbee.common.core.mq.message.InstructionsMessage; import com.fastbee.common.core.mq.message.InstructionsMessage;
import com.fastbee.common.core.mq.ota.OtaUpgradeBo; import com.fastbee.common.core.mq.ota.OtaUpgradeBo;
import com.fastbee.common.core.thingsModel.ThingsModelSimpleItem; import com.fastbee.common.core.thingsModel.ThingsModelSimpleItem;
import com.fastbee.common.enums.DeviceStatus;
import com.fastbee.common.enums.TopicType; import com.fastbee.common.enums.TopicType;
import com.fastbee.iot.domain.Device; import com.fastbee.iot.domain.Device;
import com.fastbee.mq.model.ReportDataBo; import com.fastbee.mq.model.ReportDataBo;
@@ -74,5 +75,13 @@ public interface IMqttMessagePublish {
*/ */
public Device deviceSynchronization(String deviceNumber); public Device deviceSynchronization(String deviceNumber);
/**
* 推送设备状态
*
* @param device 设备
* @param status 状态
*/
public void pushDeviceStatus(Device device, DeviceStatus status) ;
} }

View File

@@ -1,7 +1,12 @@
package com.fastbee.data.service.impl; package com.fastbee.data.service.impl;
import com.fastbee.base.session.Session;
import com.fastbee.common.enums.DeviceStatus;
import com.fastbee.iot.domain.Device; import com.fastbee.iot.domain.Device;
import com.fastbee.iot.model.DeviceStatusVO;
import com.fastbee.iot.service.IDeviceService; import com.fastbee.iot.service.IDeviceService;
import com.fastbee.mq.service.IMqttMessagePublish;
import com.fastbee.mqtt.manager.SessionManger;
import com.fastbee.sip.domain.SipDevice; import com.fastbee.sip.domain.SipDevice;
import com.fastbee.sip.domain.SipDeviceChannel; import com.fastbee.sip.domain.SipDeviceChannel;
import com.fastbee.sip.enums.DeviceChannelStatus; import com.fastbee.sip.enums.DeviceChannelStatus;
@@ -9,7 +14,9 @@ import com.fastbee.sip.mapper.SipDeviceChannelMapper;
import com.fastbee.sip.mapper.SipDeviceMapper; import com.fastbee.sip.mapper.SipDeviceMapper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.List; import java.util.List;
@@ -28,6 +35,12 @@ public class DeviceJob {
@Autowired @Autowired
private SipDeviceChannelMapper sipDeviceChannelMapper; private SipDeviceChannelMapper sipDeviceChannelMapper;
@Resource
private IMqttMessagePublish mqttMessagePublish;
@Value("${server.broker.enabled}")
private Boolean enabled;
public void updateSipDeviceOnlineStatus(Integer timeout) { public void updateSipDeviceOnlineStatus(Integer timeout) {
List<SipDevice> devs = sipDeviceMapper.selectOfflineSipDevice(timeout); List<SipDevice> devs = sipDeviceMapper.selectOfflineSipDevice(timeout);
devs.forEach(item -> { devs.forEach(item -> {
@@ -48,4 +61,37 @@ public class DeviceJob {
} }
}); });
} }
/**
* 定期同步设备状态
* 1.将异常在线设备变更为离线状态
* 2.将离线设备但实际在线设备变更为在线
*/
public void syncDeviceStatus() {
if (enabled) {
//获取所有已激活并不是禁用的设备
List<DeviceStatusVO> deviceStatusVOList = deviceService.selectDeviceActive();
if (!CollectionUtils.isEmpty(deviceStatusVOList)) {
for (DeviceStatusVO statusVO : deviceStatusVOList) {
Session session = SessionManger.getSession(statusVO.getSerialNumber());
Device device = new Device();
device.setSerialNumber(statusVO.getSerialNumber());
device.setRssi(statusVO.getRssi());
device.setProductId(statusVO.getProductId());
device.setIsShadow(statusVO.getIsShadow());
// 如果session中设备在线数据库状态离线 ,则更新设备的状态为在线
if (!Objects.isNull(session) && statusVO.getStatus() == DeviceStatus.OFFLINE.getType()) {
device.setStatus(DeviceStatus.ONLINE.getType());
deviceService.updateDeviceStatus(device);
mqttMessagePublish.pushDeviceStatus(device, DeviceStatus.ONLINE);
}
if (Objects.isNull(session) && statusVO.getStatus() == DeviceStatus.ONLINE.getType()) {
device.setStatus(DeviceStatus.OFFLINE.getType());
deviceService.updateDeviceStatus(device);
mqttMessagePublish.pushDeviceStatus(device, DeviceStatus.OFFLINE);
}
}
}
}
}
} }

View File

@@ -13,6 +13,7 @@ import com.fastbee.common.core.mq.ota.OtaUpgradeBo;
import com.fastbee.common.core.protocol.modbus.ModbusCode; import com.fastbee.common.core.protocol.modbus.ModbusCode;
import com.fastbee.common.core.redis.RedisCache; import com.fastbee.common.core.redis.RedisCache;
import com.fastbee.common.core.thingsModel.ThingsModelSimpleItem; import com.fastbee.common.core.thingsModel.ThingsModelSimpleItem;
import com.fastbee.common.enums.DeviceStatus;
import com.fastbee.common.enums.FunctionReplyStatus; import com.fastbee.common.enums.FunctionReplyStatus;
import com.fastbee.common.enums.ServerType; import com.fastbee.common.enums.ServerType;
import com.fastbee.common.enums.TopicType; import com.fastbee.common.enums.TopicType;
@@ -63,6 +64,10 @@ public class MqttMessagePublishImpl implements IMqttMessagePublish {
private IProductService productService; private IProductService productService;
@Resource @Resource
private PubMqttClient mqttClient; private PubMqttClient mqttClient;
@Resource
private PubMqttClient pubMqttClient;
@Resource @Resource
private TopicsUtils topicsUtils; private TopicsUtils topicsUtils;
@Resource @Resource
@@ -78,6 +83,7 @@ public class MqttMessagePublishImpl implements IMqttMessagePublish {
private IThingsModelService thingsModelService; private IThingsModelService thingsModelService;
@Resource @Resource
private JsonProtocolService jsonProtocolService; private JsonProtocolService jsonProtocolService;
private SnowflakeIdWorker snowflakeIdWorker = new SnowflakeIdWorker(3); private SnowflakeIdWorker snowflakeIdWorker = new SnowflakeIdWorker(3);
@Resource @Resource
@@ -368,6 +374,18 @@ public class MqttMessagePublishImpl implements IMqttMessagePublish {
return result; return result;
} }
/**
* 推送设备状态
*
* @param device 设备
* @param status 状态
*/
public void pushDeviceStatus(Device device, DeviceStatus status) {
String message = "{\"status\":" + status.getType() + ",\"isShadow\":" + device.getIsShadow() + ",\"rssi\":" + device.getRssi() + "}";
String topic = topicsUtils.buildTopic(device.getProductId(), device.getSerialNumber(), TopicType.STATUS_POST);
pubMqttClient.publish(0, false, topic, message);
}
/** /**
* 搭建消息 * 搭建消息

View File

@@ -2,15 +2,8 @@ package com.fastbee.iot.mapper;
import com.fastbee.common.core.thingsModel.ThingsModelValuesInput; import com.fastbee.common.core.thingsModel.ThingsModelValuesInput;
import com.fastbee.iot.domain.Device; import com.fastbee.iot.domain.Device;
import com.fastbee.iot.model.AuthenticateInputModel; import com.fastbee.iot.model.*;
import com.fastbee.iot.model.DeviceAllShortOutput;
import com.fastbee.iot.model.DeviceMqttVO;
import com.fastbee.iot.model.DeviceRelateAlertLogVO;
import com.fastbee.iot.model.DeviceShortOutput;
import com.fastbee.iot.model.DeviceStatistic;
import com.fastbee.iot.model.ProductAuthenticateModel;
import com.fastbee.iot.model.ThingsModels.ThingsModelValuesOutput; import com.fastbee.iot.model.ThingsModels.ThingsModelValuesOutput;
import com.fastbee.iot.model.UserIdDeviceIdModel;
import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Repository; import org.springframework.stereotype.Repository;
@@ -249,6 +242,12 @@ public interface DeviceMapper
*/ */
public List<Device> selectDevicesByProductId(@Param("productId") Long productId,@Param("hasSub") Integer hasSub); public List<Device> selectDevicesByProductId(@Param("productId") Long productId,@Param("hasSub") Integer hasSub);
/**
* 获取所有已经激活并不是禁用的设备
* @return
*/
List<DeviceStatusVO> selectDeviceActive();
/** /**
* 查询子设备总数 * 查询子设备总数
* @param gwDevCode 网关编号 * @param gwDevCode 网关编号

View File

@@ -0,0 +1,26 @@
package com.fastbee.iot.model;
import lombok.Data;
/**
* 设备状态
* @author gsb
* @date 2024/4/11 10:39
*/
@Data
public class DeviceStatusVO {
private String serialNumber;
private Integer status;
private String transport;
private Long productId;
private Integer deviceType;
private Integer rssi;
private Integer isShadow;
}

View File

@@ -227,6 +227,12 @@ public interface IDeviceService
*/ */
public List<Device> selectDevicesByProductId(Long productId,Integer hasSub); public List<Device> selectDevicesByProductId(Long productId,Integer hasSub);
/**
* 获取所有已经激活并不是禁用的设备
* @return
*/
List<DeviceStatusVO> selectDeviceActive();
/** /**
* 查询子设备总数 * 查询子设备总数
* @param gwDevCode 网关编号 * @param gwDevCode 网关编号

View File

@@ -1082,6 +1082,16 @@ public class DeviceServiceImpl implements IDeviceService {
return deviceMapper.selectDevicesByProductId(productId, hasSub); return deviceMapper.selectDevicesByProductId(productId, hasSub);
} }
/**
* 获取所有已经激活并不是禁用的设备
*
* @return
*/
@Override
public List<DeviceStatusVO> selectDeviceActive() {
return deviceMapper.selectDeviceActive();
}
/** /**
* 查询子设备总数 * 查询子设备总数
* *

View File

@@ -565,6 +565,13 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
</select> </select>
<select id="selectDeviceActive" resultType="com.fastbee.iot.model.DeviceStatusVO">
select d.status , d.serial_number as serialNumber, d.rssi,d.is_shadow, d.product_id
from iot_device d inner join iot_product p on d.product_id = p.product_id
where d.status in (3,4) and p.transport not in ('GB28181')
and d.del_flag = '0' and p.del_flag = '0'
</select>
<select id="getSubDeviceCount" resultType="java.lang.Integer"> <select id="getSubDeviceCount" resultType="java.lang.Integer">
select count(*) from iot_device d where d.gw_dev_code = #{gwDevCode,jdbcType=VARCHAR} select count(*) from iot_device d where d.gw_dev_code = #{gwDevCode,jdbcType=VARCHAR}
</select> </select>