This commit is contained in:
kerwincui
2024-03-17 14:59:23 +08:00
parent 3d44f4674c
commit 5539c1b6af
999 changed files with 115642 additions and 10757 deletions

View File

@@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>fastbee-gateway</artifactId>
<groupId>com.fastbee</groupId>
<version>3.8.5</version>
</parent>
<artifactId>fastbee-mq</artifactId>
<dependencies>
<dependency>
<groupId>com.fastbee</groupId>
<artifactId>fastbee-protocol-collect</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,17 @@
package com.fastbee.mq.config;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Configuration;
/**
* mq集群配置
* @author gsb
* @date 2022/10/10 8:27
*/
@Configuration
@ConditionalOnExpression("${cluster.enable:false}")
public class MqConfig {
}

View File

@@ -0,0 +1,40 @@
package com.fastbee.mq.model;
import com.fastbee.common.core.thingsModel.ThingsModelSimpleItem;
import lombok.Data;
import java.util.List;
/**
* 上报数据模型bo
* @author bill
*/
@Data
public class ReportDataBo {
/**产品id*/
private Long productId;
/**设备编号*/
private String serialNumber;
/**上报消息*/
private String message;
/**上报的数据*/
private List<ThingsModelSimpleItem> dataList;
/**设备影子*/
private boolean isShadow;
/**
* 物模型类型
* 1=属性2=功能3=事件4=设备升级5=设备上线6=设备下线
*/
private int type;
/**是否执行规则引擎*/
private boolean isRuleEngine;
/**从机编号*/
private Integer slaveId;
private Long userId;
private String userName;
private String deviceName;
}

View File

@@ -0,0 +1,58 @@
package com.fastbee.mq.mqttClient;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/** mqtt配置信息*/
@Data
@Component
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttClientConfig {
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 连接地址
*/
private String hostUrl;
/**
* 客户Id
*/
private String clientId;
/**
* 默认连接话题
*/
private String defaultTopic;
/**
* 超时时间
*/
private int timeout;
/**
* 保持连接数
*/
private int keepalive;
/**是否清除session*/
private boolean clearSession;
/**是否共享订阅*/
private boolean isShared;
/**分组共享订阅*/
private boolean isSharedGroup;
/**
* true: 使用netty搭建的mqttBroker false: 使用emq
*/
@Value("${server.broker.enabled}")
private Boolean enabled;
}

View File

@@ -0,0 +1,68 @@
package com.fastbee.mq.mqttClient;
import com.fastbee.common.constant.FastBeeConstant;
import com.fastbee.common.core.mq.DeviceReportBo;
import com.fastbee.common.enums.ServerType;
import com.fastbee.common.utils.DateUtils;
import com.fastbee.common.utils.gateway.mq.TopicsPost;
import com.fastbee.common.utils.gateway.mq.TopicsUtils;
import com.fastbee.mq.redischannel.producer.MessageProducer;
import com.fastbee.mq.service.IDeviceReportMessageService;
import com.fastbee.mq.service.IMessagePublishService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Arrays;
@Component
@Slf4j
public class MqttService {
@Resource
private TopicsUtils topicsUtils;
@Resource
private IDeviceReportMessageService deviceReportMessageService;
public void subscribe(MqttAsyncClient client) throws MqttException {
TopicsPost allPost = topicsUtils.getAllPost();
client.subscribe(allPost.getTopics(), allPost.getQos());
log.info("mqtt监控主题,{}", Arrays.asList(allPost.getTopics()));
}
/**
* 消息回调方法
*
* @param topic 主题
* @param mqttMessage 消息体
*/
public void subscribeCallback(String topic, MqttMessage mqttMessage) {
String message = new String(mqttMessage.getPayload());
log.info("接收消息主题 : " + topic);
log.info("接收消息Qos : " + mqttMessage.getQos());
log.info("接收消息内容 : " + message);
String serialNumber = topicsUtils.parseSerialNumber(topic);
Long productId = topicsUtils.parseProductId(topic);
String name = topicsUtils.parseTopicName(topic);
DeviceReportBo reportBo = DeviceReportBo.builder()
.serialNumber(serialNumber)
.productId(productId)
.data(mqttMessage.getPayload())
.platformDate(DateUtils.getNowDate())
.topicName(topic)
.serverType(ServerType.MQTT)
.build();
if (name.startsWith("property")) {
deviceReportMessageService.parseReportMsg(reportBo);
}
}
}

View File

@@ -0,0 +1,117 @@
package com.fastbee.mq.mqttClient;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* mqtt客户端回调
*/
@Slf4j
@Component
@Data
@NoArgsConstructor
public class PubMqttCallBack implements MqttCallbackExtended {
/**
* mqtt客户端
*/
private MqttAsyncClient client;
/**
* 创建客户端参数
*/
private MqttConnectOptions options;
@Resource
private MqttService mqttService;
private Boolean enabled;
public PubMqttCallBack(MqttAsyncClient client, MqttConnectOptions options,Boolean enabled) {
this.client = client;
this.options = options;
this.enabled = enabled;
}
/**
* mqtt客户端连接
*
* @param cause 错误
*/
@Override
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
log.debug("=>mqtt 连接丢失", cause);
int count = 1;
// int sleepTime = 0;
boolean willConnect = true;
while (willConnect) {
try {
Thread.sleep(1000);
log.debug("=>连接[{}]断开,尝试重连第{}次", this.client.getServerURI(), count++);
this.client.connect(this.options);
log.debug("=>重连成功");
willConnect = false;
} catch (Exception e) {
log.error("=>重连异常", e);
}
}
}
/**
* 客户端订阅主题回调消息
*
* @param topic 主题
* @param message 消息
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
try {
mqttService.subscribeCallback(topic, message);
} catch (Exception e) {
log.warn("mqtt 订阅消息异常", e);
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
/**
* 监听mqtt连接消息
*/
@Override
public void connectComplete(boolean reconnect, String serverURI) {
log.info("MQTT内部客户端已经连接!");
System.out.print("" +
" * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * \n" +
" * _⚲_⚲_ ______ _ ____ * \n" +
" * | / \\ | | ____| | | | _ \\ * \n" +
" * | | | ● | | | | |__ __ _ ___| |_ | |_) | ___ ___ * \n" +
" * | \\ / | | __/ _` / __| __| | _ < / _ \\/ _ \\ * \n" +
" * \\ / | | | (_| \\__ \\ |_ | |_) | __/ __/ * \n" +
" * V |_| \\__,_|___/\\__| |____/ \\___|\\___| * \n" +
" * * \n"+
" * * * * * * * * * * * * FastBee物联网平台[✔启动成功] * * * * * * * * * * * * \n");
//连接后订阅, enable为false表示使用emq
if (!enabled) {
try {
mqttService.subscribe(client);
} catch (MqttException e) {
log.error("=>订阅主题失败 error={}", e.getMessage());
}
}
}
}

View File

@@ -0,0 +1,225 @@
package com.fastbee.mq.mqttClient;
import com.fastbee.common.constant.FastBeeConstant;
import com.fastbee.common.core.redis.RedisCache;
import com.fastbee.common.enums.FunctionReplyStatus;
import com.fastbee.common.exception.ServiceException;
import com.fastbee.iot.domain.FunctionLog;
import com.fastbee.iot.service.IFunctionLogService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 发布服务mqtt客户端
*/
@Component
@Slf4j
public class PubMqttClient {
@Resource
private MqttClientConfig mqttConfig;
@Resource(name = "pubMqttCallBack")
private PubMqttCallBack mqttCallBack;
/**
* 连接配置
*/
private MqttConnectOptions options;
/**
* MQTT异步客户端
*/
private MqttAsyncClient client;
/**
* 是否连接标记
*/
private boolean isConnected = false;
@Resource
private RedisCache redisCache;
@Resource
private IFunctionLogService functionLogService;
/**
* 启动MQTT客户端
*/
public synchronized void initialize() {
try {
setOptions();
createClient();
while (!client.isConnected()) {
IMqttToken token = client.connect(options);
if(token != null && token.isComplete()) {
log.debug("=>内部MQTT客户端启动成功");
this.isConnected = true;
break;
}
log.debug("=>内部mqtt客户端连接中...");
Thread.sleep(20000);
}
} catch (MqttException ex) {
log.error("=>MQTT客户端初始化异常", ex);
} catch (Exception e) {
log.error("=>连接MQTT服务器异常", e);
this.isConnected = false;
}
}
public boolean isConnected() {
return this.isConnected;
}
private void createClient() {
try {
if (client == null) {
/*host为主机名clientId是连接MQTT的客户端ID*/
client = new MqttAsyncClient(mqttConfig.getHostUrl(), getClientId(), new MemoryPersistence());
//设置回调函数
client.setCallback(mqttCallBack);
mqttCallBack.setClient(client);
mqttCallBack.setOptions(this.options);
mqttCallBack.setEnabled(mqttConfig.getEnabled());
}
} catch (Exception e) {
log.error("=>mqtt客户端创建错误");
}
}
/**
* 设置连接属性
*/
private void setOptions() {
if (options != null) {
options = null;
}
options = new MqttConnectOptions();
options.setConnectionTimeout(mqttConfig.getTimeout());
options.setKeepAliveInterval(mqttConfig.getKeepalive());
options.setUserName(mqttConfig.getUsername());
options.setPassword(mqttConfig.getPassword().toCharArray());
//设置自动重新连接
options.setAutomaticReconnect(true);
/*设置为false断开连接不清除session重连后还是原来的session
保留订阅的主题,能接收离线期间的消息*/
options.setCleanSession(true);
}
/**
* 断开与mqtt的连接
*/
public synchronized void disconnect() {
//判断客户端是否null 是否连接
if (client != null && client.isConnected()) {
try {
IMqttToken token = client.disconnect();
token.waitForCompletion();
} catch (MqttException e) {
log.error("=>断开mqtt连接发生错误 message={}", e.getMessage());
throw new ServiceException("断开mqtt连接发生错误" + e.getMessage());
}
}
client = null;
}
/**
* 重新连接MQTT
*/
public synchronized void refresh() {
disconnect();
initialize();
}
/**
* 拼接客户端id
*/
public final String getClientId() {
return FastBeeConstant.SERVER.WM_PREFIX + System.currentTimeMillis();
}
/**
* 发布qos=1非持久化
*/
public void publish(String topic, byte[] pushMessage, FunctionLog log) {
try {
redisCache.incr2(FastBeeConstant.REDIS.MESSAGE_SEND_TOTAL, -1L);
redisCache.incr2(FastBeeConstant.REDIS.MESSAGE_SEND_TODAY, 60 * 60 * 24);
publish(pushMessage, topic, false, 0);
if (null != log) {
//存储服务下发成功
log.setResultMsg(FunctionReplyStatus.NORELY.getMessage());
log.setResultCode(FunctionReplyStatus.NORELY.getCode());
functionLogService.insertFunctionLog(log);
}
} catch (Exception e) {
if (null != log) {
//服务下发失败存储
log.setResultMsg(FunctionReplyStatus.FAIl.getMessage() + "原因: " + e.getMessage());
log.setResultCode(FunctionReplyStatus.FAIl.getCode());
functionLogService.insertFunctionLog(log);
}
}
}
/**
* 发布主题
*
* @param message payload消息体
* @param topic 主题
* @param retained 是否保留消息
* @param qos 消息质量
* Qos1消息发送一次不确保
* Qos2至少分发一次服务器确保接收消息进行确认
* Qos3只分发一次确保消息送达和只传递一次
*/
public void publish(byte[] message, String topic, boolean retained, int qos) {
//设置mqtt消息
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
mqttMessage.setPayload(message);
IMqttDeliveryToken token;
try {
token = client.publish(topic, mqttMessage);
token.waitForCompletion();
} catch (MqttPersistenceException e) {
log.error("=>发布主题时发生错误 topic={},message={}", topic, e.getMessage());
throw new ServiceException(e.getMessage());
} catch (MqttException ex) {
throw new ServiceException(ex.getMessage());
}
}
/**
* 发布
*
* @param qos 连接方式
* @param retained 是否保留
* @param topic 主题
* @param pushMessage 消息体
*/
public void publish(int qos, boolean retained, String topic, String pushMessage) {
redisCache.incr2(FastBeeConstant.REDIS.MESSAGE_SEND_TOTAL, -1L);
redisCache.incr2(FastBeeConstant.REDIS.MESSAGE_SEND_TODAY, 60 * 60 * 24);
log.info("发布主题[{}],发布消息[{}]" + topic,pushMessage);
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(pushMessage.getBytes());
try {
IMqttDeliveryToken token = client.publish(topic, message);
token.waitForCompletion();
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
log.error("=>发布主题时发生错误 topic={},message={}", topic, e.getMessage());
}
}
}

View File

@@ -0,0 +1,57 @@
package com.fastbee.mq.redischannel.config;
import com.fastbee.common.constant.FastBeeConstant;
import com.fastbee.mq.redischannel.consumer.RedisChannelConsume;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
/**
* redisChannel配置
* @author gsb
* @date 2022/10/10 8:57
*/
@Configuration
@EnableCaching
@Slf4j
public class RedisConsumeConfig {
@Bean
@ConditionalOnProperty(prefix ="cluster", name = "type" ,havingValue = FastBeeConstant.MQTT.REDIS_CHANNEL,matchIfMissing = true)
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 可以添加多个 messageListener配置不同的交换机
container.addMessageListener(listenerAdapter, new PatternTopic(FastBeeConstant.CHANNEL.DEVICE_STATUS));
container.addMessageListener(listenerAdapter, new PatternTopic(FastBeeConstant.CHANNEL.PROP_READ));
container.addMessageListener(listenerAdapter, new PatternTopic(FastBeeConstant.CHANNEL.FUNCTION_INVOKE));
container.addMessageListener(listenerAdapter,new PatternTopic(FastBeeConstant.CHANNEL.UPGRADE));
return container;
}
/**配置消息监听类 默认监听方法onMessage*/
@Bean
@ConditionalOnProperty(prefix ="cluster", name = "type" ,havingValue = FastBeeConstant.MQTT.REDIS_CHANNEL,matchIfMissing = true)
MessageListenerAdapter listenerAdapter(RedisChannelConsume consume){
return new MessageListenerAdapter(consume,"onMessage");
}
@Bean
@ConditionalOnProperty(prefix ="cluster", name = "type" ,havingValue = FastBeeConstant.MQTT.REDIS_CHANNEL,matchIfMissing = true)
StringRedisTemplate template(RedisConnectionFactory connectionFactory){
return new StringRedisTemplate(connectionFactory);
}
}

View File

@@ -0,0 +1,34 @@
package com.fastbee.mq.redischannel.consumer;
import com.fastbee.common.constant.FastBeeConstant;
import com.fastbee.common.core.mq.DeviceReportBo;
import com.fastbee.mq.service.impl.DeviceOtherMsgHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author gsb
* @date 2023/2/27 14:33
*/
@Slf4j
@Component
public class DeviceOtherMsgConsumer {
@Resource
private DeviceOtherMsgHandler otherMsgHandler;
@Async(FastBeeConstant.TASK.DEVICE_OTHER_TASK)
public void consume(DeviceReportBo bo){
try {
//处理emq订阅的非 property/post 属性上报的消息 ,因为其他消息量小,放在一起处理
otherMsgHandler.messageHandler(bo);
}catch (Exception e){
log.error("=>设备其他消息处理出错",e);
}
}
}

View File

@@ -0,0 +1,33 @@
package com.fastbee.mq.redischannel.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
/**
* redisChannel消息监听
*
* @author gsb
* @date 2022/10/10 9:17
*/
@Component
@Slf4j
public class RedisChannelConsume implements MessageListener {
/**
* 监听推送消息
*/
@Override
public void onMessage(Message message, byte[] pattern) {
try {
/*获取channel*/
String channel = new String(message.getChannel());
/*获取消息*/
String body = new String(message.getBody());
} catch (Exception e) {
log.error("=>redisChannel处理消息异常,e", e);
}
}
}

View File

@@ -0,0 +1,35 @@
package com.fastbee.mq.redischannel.listen;
import com.fastbee.common.constant.FastBeeConstant;
import com.fastbee.common.core.mq.DeviceReportBo;
import com.fastbee.mq.redischannel.consumer.DeviceOtherMsgConsumer;
import com.fastbee.mq.redischannel.queue.DeviceOtherQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author gsb
* @date 2023/2/28 10:02
*/
@Slf4j
@Component
public class DeviceOtherListen {
@Resource
private DeviceOtherMsgConsumer otherMsgConsumer;
@Async(FastBeeConstant.TASK.DEVICE_OTHER_TASK)
public void listen(){
while (true){
try {
DeviceReportBo reportBo = DeviceOtherQueue.take();
otherMsgConsumer.consume(reportBo);
}catch (Exception e){
log.error("=>emq数据转发异常");
}
}
}
}

View File

@@ -0,0 +1,20 @@
package com.fastbee.mq.redischannel.producer;
import com.fastbee.common.core.mq.DeviceReportBo;
import com.fastbee.common.core.mq.DeviceStatusBo;
import com.fastbee.common.core.mq.MQSendMessageBo;
import com.fastbee.common.core.mq.ota.OtaUpgradeBo;
import com.fastbee.common.core.mq.message.DeviceDownMessage;
import com.fastbee.mq.redischannel.queue.*;
/**
*设备消息生产者 ,设备的消息发送通道
* @author bill
*/
public class MessageProducer {
public static void sendOtherMsg(DeviceReportBo bo){
DeviceOtherQueue.offer(bo);
}
}

View File

@@ -0,0 +1,25 @@
package com.fastbee.mq.redischannel.queue;
import com.fastbee.common.core.mq.DeviceReportBo;
import lombok.SneakyThrows;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @author gsb
* @date 2022/10/10 10:13
*/
public class DeviceOtherQueue {
private static final LinkedBlockingQueue<DeviceReportBo> queue = new LinkedBlockingQueue<>();
/*元素加入队列,最后*/
public static void offer(DeviceReportBo dto){
queue.offer(dto);
}
/*取出队列元素 先进先出*/
@SneakyThrows
public static DeviceReportBo take(){
return queue.take();
}
}

View File

@@ -0,0 +1,32 @@
package com.fastbee.mq.service;
import com.fastbee.mq.model.ReportDataBo;
/**
* 客户端上报数据处理方法集合
* @author bill
*/
public interface IDataHandler {
/**
* 上报属性或功能处理
*
* @param bo 上报数据模型
*/
public void reportData(ReportDataBo bo);
/**
* 上报事件
*
* @param bo 上报数据模型
*/
public void reportEvent(ReportDataBo bo);
/**
* 上报设备信息
* @param bo 上报数据模型
*/
public void reportDevice(ReportDataBo bo);
}

View File

@@ -0,0 +1,35 @@
package com.fastbee.mq.service;
import com.fastbee.common.core.mq.DeviceReport;
import com.fastbee.common.core.mq.DeviceReportBo;
import com.fastbee.iot.domain.Device;
/**
* 处理设备上报数据解析
* @author gsb
* @date 2022/10/10 13:48
*/
public interface IDeviceReportMessageService {
/**
* 处理设备主动上报数据
* @param bo
*/
public void parseReportMsg(DeviceReportBo bo);
/**
* 构建消息
* @param bo
*/
public Device buildReport(DeviceReportBo bo);
/**
* 处理设备主动上报属性
*
* @param topicName
* @param message
*/
public void handlerReportMessage(DeviceReport message, String topicName);
}

View File

@@ -0,0 +1,21 @@
package com.fastbee.mq.service;
import com.fastbee.common.core.mq.InvokeReqDto;
import java.util.Map;
/**
* 设备指令下发接口
* @author gsb
* @date 2022/12/5 11:03
*/
public interface IFunctionInvoke {
/**
* 服务调用,设备不响应
* @param reqDto 服务下发对象
* @return 消息id messageId
*/
public String invokeNoReply(InvokeReqDto reqDto);
}

View File

@@ -0,0 +1,19 @@
package com.fastbee.mq.service;
import com.fastbee.common.core.mq.message.DeviceMessage;
/**
* 设备消息推送mq
* @author bill
*/
public interface IMessagePublishService {
/**
* 发布消息到mq
* @param message 设备消息
* @param channel 推送channel
*/
public void publish(Object message,String channel);
}

View File

@@ -0,0 +1,78 @@
package com.fastbee.mq.service;
import com.fastbee.common.core.mq.DeviceReportBo;
import com.fastbee.common.core.mq.MQSendMessageBo;
import com.fastbee.common.core.mq.message.DeviceDownMessage;
import com.fastbee.common.core.mq.message.InstructionsMessage;
import com.fastbee.common.core.mq.ota.OtaUpgradeBo;
import com.fastbee.common.core.thingsModel.ThingsModelSimpleItem;
import com.fastbee.common.enums.TopicType;
import com.fastbee.iot.domain.Device;
import com.fastbee.mq.model.ReportDataBo;
import java.util.List;
public interface IMqttMessagePublish {
/**
* 下发数据编码
*/
InstructionsMessage buildMessage(DeviceDownMessage downMessage, TopicType type);
/**
* 服务(指令)下发
*/
public void funcSend(MQSendMessageBo bo);
/**
* OTA升级下发
*/
public void upGradeOTA(OtaUpgradeBo bo);
public void sendFunctionMessage(DeviceReportBo bo);
/**
* 1.发布设备状态
*/
public void publishStatus(Long productId, String deviceNum, int deviceStatus, int isShadow, int rssi);
/**
* 2.发布设备信息
*/
public void publishInfo(Long productId, String deviceNum);
/**
* 3.发布时钟同步信息
*
* @param bo 数据模型
*/
public void publishNtp(ReportDataBo bo);
/**
* 4.发布属性
* delay 延时,秒为单位
*/
public void publishProperty(Long productId, String deviceNum, List<ThingsModelSimpleItem> thingsList, int delay);
/**
* 5.发布功能
* delay 延时,秒为单位
*/
public void publishFunction(Long productId, String deviceNum, List<ThingsModelSimpleItem> thingsList, int delay);
/**
* 设备数据同步
*
* @param deviceNumber 设备编号
* @return 设备
*/
public Device deviceSynchronization(String deviceNumber);
}

View File

@@ -0,0 +1,95 @@
package com.fastbee.mq.service.impl;
import com.fastbee.common.core.mq.DeviceReportBo;
import com.fastbee.common.utils.gateway.mq.TopicsUtils;
import com.fastbee.mq.model.ReportDataBo;
import com.fastbee.mq.service.IDataHandler;
import com.fastbee.mq.service.IMqttMessagePublish;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author gsb
* @date 2023/2/27 14:42
*/
@Component
@Slf4j
public class DeviceOtherMsgHandler {
@Resource
private TopicsUtils topicsUtils;
@Resource
private IDataHandler dataHandler;
@Resource
private IMqttMessagePublish messagePublish;
/**
* 非属性消息消息处理入口
* @param bo
*/
public void messageHandler(DeviceReportBo bo){
String type = "";
String name = topicsUtils.parseTopicName(bo.getTopicName());
ReportDataBo data = this.buildReportData(bo);
switch (name) {
case "info":
dataHandler.reportDevice(data);
break;
case "ntp":
messagePublish.publishNtp(data);
break;
// 接收 property/get 模拟设备数据
case "property":
type = topicsUtils.parseTopicName4(bo.getTopicName());
break;
case "function":
data.setShadow(false);
data.setType(2);
data.setRuleEngine(true);
dataHandler.reportData(data);
break;
case "event":
data.setType(3);
data.setRuleEngine(true);
dataHandler.reportEvent(data);
break;
case "property-offline":
data.setShadow(true);
data.setType(1);
dataHandler.reportData(data);
break;
case "function-offline":
data.setShadow(true);
data.setType(2);
dataHandler.reportData(data);
break;
case "property-online":
break;
case "function-online":
type = topicsUtils.parseTopicName4(bo.getTopicName());
if (type.equals("get")) {
log.info("function-online:{}",bo);
//处理功能下发
messagePublish.sendFunctionMessage(bo);
}
break;
}
}
/**组装数据*/
private ReportDataBo buildReportData(DeviceReportBo bo){
String message = new String(bo.getData());
log.info("收到设备信息[{}]",message);
Long productId = topicsUtils.parseProductId(bo.getTopicName());
ReportDataBo dataBo = new ReportDataBo();
dataBo.setMessage(message);
dataBo.setProductId(productId);
dataBo.setSerialNumber(bo.getSerialNumber());
dataBo.setRuleEngine(false);
return dataBo;
}
}

View File

@@ -0,0 +1,55 @@
package com.fastbee.mq.service.impl;
import com.fastbee.common.core.mq.DeviceReplyBo;
import com.fastbee.common.core.mq.InvokeReqDto;
import com.fastbee.common.core.mq.MQSendMessageBo;
import com.fastbee.common.core.mq.MessageReplyBo;
import com.fastbee.common.core.protocol.modbus.ModbusCode;
import com.fastbee.common.core.redis.RedisCache;
import com.fastbee.common.core.redis.RedisKeyBuilder;
import com.fastbee.common.enums.ThingsModelType;
import com.fastbee.common.utils.bean.BeanUtils;
import com.fastbee.iot.util.SnowflakeIdWorker;
import com.fastbee.mq.redischannel.producer.MessageProducer;
import com.fastbee.mq.service.IFunctionInvoke;
import com.fastbee.mq.service.IMqttMessagePublish;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* @author gsb
* @date 2022/12/5 11:34
*/
@Slf4j
@Service
public class FunctionInvokeImpl implements IFunctionInvoke {
@Resource
private IMqttMessagePublish mqttMessagePublish;
private SnowflakeIdWorker snowflakeIdWorker = new SnowflakeIdWorker(2);
/**
* 服务调用,设备不响应
* @param reqDto 服务下发对象
* @return 消息id messageId
*/
@Override
public String invokeNoReply(InvokeReqDto reqDto){
log.debug("=>下发指令请求:[{}]",reqDto);
MQSendMessageBo bo = new MQSendMessageBo();
BeanUtils.copyBeanProp(bo,reqDto);
long id = snowflakeIdWorker.nextId();
String messageId = id+"";
bo.setMessageId(messageId+"");
bo.setType(ThingsModelType.getType(reqDto.getType()));
mqttMessagePublish.funcSend(bo);
return messageId;
}
}

View File

@@ -0,0 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>fastbee-gateway</artifactId>
<groupId>com.fastbee</groupId>
<version>3.8.5</version>
</parent>
<artifactId>gateway-boot</artifactId>
<description>网关模块</description>
<dependencies>
<dependency>
<groupId>com.fastbee</groupId>
<artifactId>fastbee-mq</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,42 @@
package com.fastbee.gateway.boot.start;
import com.fastbee.mq.mqttClient.PubMqttClient;
import com.fastbee.mq.redischannel.listen.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 启动类
*
* @author bill
*/
@Component
@Slf4j
@Order(2)
public class StartBoot implements ApplicationRunner {
@Autowired
private PubMqttClient mqttClient;
@Resource
private DeviceOtherListen otherListen;
@Override
public void run(ApplicationArguments args) throws Exception {
try {
otherListen.listen();
/*启动内部客户端,用来下发客户端服务*/
mqttClient.initialize();
log.info("=>设备监听队列启动成功");
} catch (Exception e) {
log.error("=>客户端启动失败:{}", e.getMessage(),e);
}
}
}

View File

@@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>fastbee</artifactId>
<groupId>com.fastbee</groupId>
<version>3.8.5</version>
</parent>
<packaging>pom</packaging>
<modules>
<module>gateway-boot</module>
<module>fastbee-mq</module>
</modules>
<artifactId>fastbee-gateway</artifactId>
<name>fastbee-gateway</name>
<dependencies>
<dependency>
<groupId>com.fastbee</groupId>
<artifactId>fastbee-common</artifactId>
</dependency>
<dependency>
<groupId>com.fastbee</groupId>
<artifactId>fastbee-iot-service</artifactId>
</dependency>
</dependencies>
</project>