mirror of
https://gitee.com/beecue/fastbee.git
synced 2025-12-17 16:36:03 +08:00
1.代码更新
This commit is contained in:
@@ -20,6 +20,12 @@
|
||||
<groupId>cn.hutool</groupId>
|
||||
<artifactId>hutool-all</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fastbee</groupId>
|
||||
<artifactId>fastbee-mqtt-client</artifactId>
|
||||
<version>3.8.5</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
|
||||
|
||||
</dependencies>
|
||||
|
||||
@@ -1,58 +0,0 @@
|
||||
package com.fastbee.mq.mqttClient;
|
||||
|
||||
import lombok.Data;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/** mqtt配置信息*/
|
||||
|
||||
@Data
|
||||
@Component
|
||||
@ConfigurationProperties(prefix = "spring.mqtt")
|
||||
public class MqttClientConfig {
|
||||
|
||||
/**
|
||||
* 用户名
|
||||
*/
|
||||
private String username;
|
||||
/**
|
||||
* 密码
|
||||
*/
|
||||
private String password;
|
||||
/**
|
||||
* 连接地址
|
||||
*/
|
||||
private String hostUrl;
|
||||
/**
|
||||
* 客户Id
|
||||
*/
|
||||
private String clientId;
|
||||
/**
|
||||
* 默认连接话题
|
||||
*/
|
||||
private String defaultTopic;
|
||||
/**
|
||||
* 超时时间
|
||||
*/
|
||||
private int timeout;
|
||||
/**
|
||||
* 保持连接数
|
||||
*/
|
||||
private int keepalive;
|
||||
|
||||
/**是否清除session*/
|
||||
private boolean clearSession;
|
||||
/**是否共享订阅*/
|
||||
private boolean isShared;
|
||||
/**分组共享订阅*/
|
||||
private boolean isSharedGroup;
|
||||
|
||||
/**
|
||||
* true: 使用netty搭建的mqttBroker false: 使用emq
|
||||
*/
|
||||
@Value("${server.broker.enabled}")
|
||||
private Boolean enabled;
|
||||
|
||||
}
|
||||
|
||||
@@ -1,87 +0,0 @@
|
||||
package com.fastbee.mq.mqttClient;
|
||||
|
||||
import com.fastbee.common.constant.FastBeeConstant;
|
||||
import com.fastbee.common.core.mq.DeviceReportBo;
|
||||
import com.fastbee.common.enums.ServerType;
|
||||
import com.fastbee.common.utils.DateUtils;
|
||||
import com.fastbee.common.utils.StringUtils;
|
||||
import com.fastbee.common.utils.gateway.mq.TopicsPost;
|
||||
import com.fastbee.common.utils.gateway.mq.TopicsUtils;
|
||||
import com.fastbee.iot.ruleEngine.MsgContext;
|
||||
import com.fastbee.iot.ruleEngine.RuleProcess;
|
||||
import com.fastbee.mq.redischannel.producer.MessageProducer;
|
||||
import com.fastbee.mq.service.IDeviceReportMessageService;
|
||||
import com.fastbee.mq.service.IMessagePublishService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
|
||||
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Arrays;
|
||||
import java.util.Objects;
|
||||
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
public class MqttService {
|
||||
|
||||
@Resource
|
||||
private TopicsUtils topicsUtils;
|
||||
@Resource
|
||||
private IDeviceReportMessageService deviceReportMessageService;
|
||||
|
||||
@Resource
|
||||
private RuleProcess ruleProcess;
|
||||
|
||||
|
||||
|
||||
public void subscribe(MqttAsyncClient client) throws MqttException {
|
||||
TopicsPost allPost = topicsUtils.getAllPost();
|
||||
client.subscribe(allPost.getTopics(), allPost.getQos());
|
||||
log.info("mqtt监控主题,{}", Arrays.asList(allPost.getTopics()));
|
||||
}
|
||||
|
||||
/**
|
||||
* 消息回调方法
|
||||
*
|
||||
* @param topic 主题
|
||||
* @param mqttMessage 消息体
|
||||
*/
|
||||
public void subscribeCallback(String topic, MqttMessage mqttMessage) {
|
||||
|
||||
String message = new String(mqttMessage.getPayload());
|
||||
log.info("接收消息主题 : " + topic);
|
||||
log.info("接收消息Qos : " + mqttMessage.getQos());
|
||||
log.info("接收消息内容 : " + message);
|
||||
|
||||
//这里默认设备编号长度超过9位
|
||||
String[] split = topic.split("/");
|
||||
String clientId = Arrays.stream(split).filter(imei -> imei.length() > 9).findFirst().get();
|
||||
// 规则引擎脚本处理,完成后返回结果
|
||||
MsgContext context = ruleProcess.processRuleScript(clientId, 1, topic, message);
|
||||
if (!Objects.isNull(context) && StringUtils.isNotEmpty(context.getPayload())
|
||||
&& StringUtils.isNotEmpty(context.getTopic())) {
|
||||
topic = context.getTopic();
|
||||
message = context.getPayload();
|
||||
}
|
||||
|
||||
String serialNumber = topicsUtils.parseSerialNumber(topic);
|
||||
Long productId = topicsUtils.parseProductId(topic);
|
||||
String name = topicsUtils.parseTopicName(topic);
|
||||
DeviceReportBo reportBo = DeviceReportBo.builder()
|
||||
.serialNumber(serialNumber)
|
||||
.productId(productId)
|
||||
.data(mqttMessage.getPayload())
|
||||
.platformDate(DateUtils.getNowDate())
|
||||
.topicName(topic)
|
||||
.serverType(ServerType.MQTT)
|
||||
.build();
|
||||
if (name.startsWith("property")) {
|
||||
deviceReportMessageService.parseReportMsg(reportBo);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -1,117 +0,0 @@
|
||||
package com.fastbee.mq.mqttClient;
|
||||
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.paho.client.mqttv3.*;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
/**
|
||||
* mqtt客户端回调
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
public class PubMqttCallBack implements MqttCallbackExtended {
|
||||
|
||||
|
||||
/**
|
||||
* mqtt客户端
|
||||
*/
|
||||
private MqttAsyncClient client;
|
||||
/**
|
||||
* 创建客户端参数
|
||||
*/
|
||||
private MqttConnectOptions options;
|
||||
|
||||
@Resource
|
||||
private MqttService mqttService;
|
||||
|
||||
private Boolean enabled;
|
||||
|
||||
|
||||
public PubMqttCallBack(MqttAsyncClient client, MqttConnectOptions options,Boolean enabled) {
|
||||
this.client = client;
|
||||
this.options = options;
|
||||
this.enabled = enabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* mqtt客户端连接
|
||||
*
|
||||
* @param cause 错误
|
||||
*/
|
||||
@Override
|
||||
public void connectionLost(Throwable cause) {
|
||||
|
||||
// 连接丢失后,一般在这里面进行重连
|
||||
log.debug("=>mqtt 连接丢失", cause);
|
||||
int count = 1;
|
||||
// int sleepTime = 0;
|
||||
boolean willConnect = true;
|
||||
while (willConnect) {
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
log.debug("=>连接[{}]断开,尝试重连第{}次", this.client.getServerURI(), count++);
|
||||
this.client.connect(this.options);
|
||||
log.debug("=>重连成功");
|
||||
willConnect = false;
|
||||
} catch (Exception e) {
|
||||
log.error("=>重连异常", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 客户端订阅主题回调消息
|
||||
*
|
||||
* @param topic 主题
|
||||
* @param message 消息
|
||||
*/
|
||||
@Override
|
||||
public void messageArrived(String topic, MqttMessage message) throws Exception {
|
||||
// subscribe后得到的消息会执行到这里面
|
||||
try {
|
||||
mqttService.subscribeCallback(topic, message);
|
||||
} catch (Exception e) {
|
||||
log.warn("mqtt 订阅消息异常", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deliveryComplete(IMqttDeliveryToken token) {
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 监听mqtt连接消息
|
||||
*/
|
||||
@Override
|
||||
public void connectComplete(boolean reconnect, String serverURI) {
|
||||
log.info("MQTT内部客户端已经连接!");
|
||||
System.out.print("" +
|
||||
" * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * \n" +
|
||||
" * _⚲_⚲_ ______ _ ____ * \n" +
|
||||
" * | / \\ | | ____| | | | _ \\ * \n" +
|
||||
" * | | | ● | | | | |__ __ _ ___| |_ | |_) | ___ ___ * \n" +
|
||||
" * | \\ / | | __/ _` / __| __| | _ < / _ \\/ _ \\ * \n" +
|
||||
" * \\ / | | | (_| \\__ \\ |_ | |_) | __/ __/ * \n" +
|
||||
" * V |_| \\__,_|___/\\__| |____/ \\___|\\___| * \n" +
|
||||
" * * \n"+
|
||||
" * * * * * * * * * * * * FastBee物联网平台[✔启动成功] * * * * * * * * * * * * \n");
|
||||
|
||||
//连接后订阅, enable为false表示使用emq
|
||||
if (!enabled) {
|
||||
try {
|
||||
mqttService.subscribe(client);
|
||||
} catch (MqttException e) {
|
||||
log.error("=>订阅主题失败 error={}", e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,225 +0,0 @@
|
||||
package com.fastbee.mq.mqttClient;
|
||||
|
||||
import com.fastbee.common.constant.FastBeeConstant;
|
||||
import com.fastbee.common.core.redis.RedisCache;
|
||||
import com.fastbee.common.enums.FunctionReplyStatus;
|
||||
import com.fastbee.common.exception.ServiceException;
|
||||
import com.fastbee.iot.domain.FunctionLog;
|
||||
import com.fastbee.iot.service.IFunctionLogService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.paho.client.mqttv3.*;
|
||||
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
/**
|
||||
* 发布服务mqtt客户端
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class PubMqttClient {
|
||||
|
||||
@Resource
|
||||
private MqttClientConfig mqttConfig;
|
||||
@Resource(name = "pubMqttCallBack")
|
||||
private PubMqttCallBack mqttCallBack;
|
||||
/**
|
||||
* 连接配置
|
||||
*/
|
||||
private MqttConnectOptions options;
|
||||
/**
|
||||
* MQTT异步客户端
|
||||
*/
|
||||
private MqttAsyncClient client;
|
||||
/**
|
||||
* 是否连接标记
|
||||
*/
|
||||
private boolean isConnected = false;
|
||||
@Resource
|
||||
private RedisCache redisCache;
|
||||
@Resource
|
||||
private IFunctionLogService functionLogService;
|
||||
|
||||
/**
|
||||
* 启动MQTT客户端
|
||||
*/
|
||||
public synchronized void initialize() {
|
||||
|
||||
try {
|
||||
setOptions();
|
||||
createClient();
|
||||
while (!client.isConnected()) {
|
||||
IMqttToken token = client.connect(options);
|
||||
if(token != null && token.isComplete()) {
|
||||
log.debug("=>内部MQTT客户端启动成功");
|
||||
this.isConnected = true;
|
||||
break;
|
||||
}
|
||||
log.debug("=>内部mqtt客户端连接中...");
|
||||
Thread.sleep(20000);
|
||||
}
|
||||
} catch (MqttException ex) {
|
||||
log.error("=>MQTT客户端初始化异常", ex);
|
||||
} catch (Exception e) {
|
||||
log.error("=>连接MQTT服务器异常", e);
|
||||
this.isConnected = false;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public boolean isConnected() {
|
||||
return this.isConnected;
|
||||
}
|
||||
|
||||
private void createClient() {
|
||||
try {
|
||||
if (client == null) {
|
||||
/*host为主机名,clientId是连接MQTT的客户端ID*/
|
||||
client = new MqttAsyncClient(mqttConfig.getHostUrl(), getClientId(), new MemoryPersistence());
|
||||
//设置回调函数
|
||||
client.setCallback(mqttCallBack);
|
||||
mqttCallBack.setClient(client);
|
||||
mqttCallBack.setOptions(this.options);
|
||||
mqttCallBack.setEnabled(mqttConfig.getEnabled());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("=>mqtt客户端创建错误");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置连接属性
|
||||
*/
|
||||
private void setOptions() {
|
||||
|
||||
if (options != null) {
|
||||
options = null;
|
||||
}
|
||||
options = new MqttConnectOptions();
|
||||
options.setConnectionTimeout(mqttConfig.getTimeout());
|
||||
options.setKeepAliveInterval(mqttConfig.getKeepalive());
|
||||
options.setUserName(mqttConfig.getUsername());
|
||||
options.setPassword(mqttConfig.getPassword().toCharArray());
|
||||
//设置自动重新连接
|
||||
options.setAutomaticReconnect(true);
|
||||
/*设置为false,断开连接,不清除session,重连后还是原来的session
|
||||
保留订阅的主题,能接收离线期间的消息*/
|
||||
options.setCleanSession(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* 断开与mqtt的连接
|
||||
*/
|
||||
public synchronized void disconnect() {
|
||||
//判断客户端是否null 是否连接
|
||||
if (client != null && client.isConnected()) {
|
||||
try {
|
||||
IMqttToken token = client.disconnect();
|
||||
token.waitForCompletion();
|
||||
} catch (MqttException e) {
|
||||
log.error("=>断开mqtt连接发生错误 message={}", e.getMessage());
|
||||
throw new ServiceException("断开mqtt连接发生错误" + e.getMessage());
|
||||
}
|
||||
}
|
||||
client = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 重新连接MQTT
|
||||
*/
|
||||
public synchronized void refresh() {
|
||||
disconnect();
|
||||
initialize();
|
||||
}
|
||||
|
||||
/**
|
||||
* 拼接客户端id
|
||||
*/
|
||||
public final String getClientId() {
|
||||
return FastBeeConstant.SERVER.WM_PREFIX + System.currentTimeMillis();
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布qos=1,非持久化
|
||||
*/
|
||||
public void publish(String topic, byte[] pushMessage, FunctionLog log) {
|
||||
try {
|
||||
redisCache.incr2(FastBeeConstant.REDIS.MESSAGE_SEND_TOTAL, -1L);
|
||||
redisCache.incr2(FastBeeConstant.REDIS.MESSAGE_SEND_TODAY, 60 * 60 * 24);
|
||||
publish(pushMessage, topic, false, 0);
|
||||
if (null != log) {
|
||||
//存储服务下发成功
|
||||
log.setResultMsg(FunctionReplyStatus.NORELY.getMessage());
|
||||
log.setResultCode(FunctionReplyStatus.NORELY.getCode());
|
||||
functionLogService.insertFunctionLog(log);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
if (null != log) {
|
||||
//服务下发失败存储
|
||||
log.setResultMsg(FunctionReplyStatus.FAIl.getMessage() + "原因: " + e.getMessage());
|
||||
log.setResultCode(FunctionReplyStatus.FAIl.getCode());
|
||||
functionLogService.insertFunctionLog(log);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布主题
|
||||
*
|
||||
* @param message payload消息体
|
||||
* @param topic 主题
|
||||
* @param retained 是否保留消息
|
||||
* @param qos 消息质量
|
||||
* Qos1:消息发送一次,不确保
|
||||
* Qos2:至少分发一次,服务器确保接收消息进行确认
|
||||
* Qos3:只分发一次,确保消息送达和只传递一次
|
||||
*/
|
||||
public void publish(byte[] message, String topic, boolean retained, int qos) {
|
||||
//设置mqtt消息
|
||||
MqttMessage mqttMessage = new MqttMessage();
|
||||
mqttMessage.setQos(qos);
|
||||
mqttMessage.setRetained(retained);
|
||||
mqttMessage.setPayload(message);
|
||||
|
||||
IMqttDeliveryToken token;
|
||||
try {
|
||||
token = client.publish(topic, mqttMessage);
|
||||
token.waitForCompletion();
|
||||
} catch (MqttPersistenceException e) {
|
||||
log.error("=>发布主题时发生错误 topic={},message={}", topic, e.getMessage());
|
||||
throw new ServiceException(e.getMessage());
|
||||
} catch (MqttException ex) {
|
||||
throw new ServiceException(ex.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 发布
|
||||
*
|
||||
* @param qos 连接方式
|
||||
* @param retained 是否保留
|
||||
* @param topic 主题
|
||||
* @param pushMessage 消息体
|
||||
*/
|
||||
public void publish(int qos, boolean retained, String topic, String pushMessage) {
|
||||
redisCache.incr2(FastBeeConstant.REDIS.MESSAGE_SEND_TOTAL, -1L);
|
||||
redisCache.incr2(FastBeeConstant.REDIS.MESSAGE_SEND_TODAY, 60 * 60 * 24);
|
||||
log.info("发布主题[{}],发布消息[{}]" + topic,pushMessage);
|
||||
MqttMessage message = new MqttMessage();
|
||||
message.setQos(qos);
|
||||
message.setRetained(retained);
|
||||
message.setPayload(pushMessage.getBytes());
|
||||
|
||||
try {
|
||||
IMqttDeliveryToken token = client.publish(topic, message);
|
||||
token.waitForCompletion();
|
||||
} catch (MqttPersistenceException e) {
|
||||
e.printStackTrace();
|
||||
} catch (MqttException e) {
|
||||
log.error("=>发布主题时发生错误 topic={},message={}", topic, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,16 @@
|
||||
package com.fastbee.mq.redischannel.producer;
|
||||
|
||||
import com.fastbee.common.core.mq.DeviceReportBo;
|
||||
import com.fastbee.mqttclient.IEmqxMessageProducer;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author bill
|
||||
*/
|
||||
@Component
|
||||
public class EmqxMessageProducer implements IEmqxMessageProducer {
|
||||
@Override
|
||||
public void sendEmqxMessage(String topicName, DeviceReportBo deviceReportBo) {
|
||||
MessageProducer.sendOtherMsg(deviceReportBo);
|
||||
}
|
||||
}
|
||||
@@ -1,10 +1,6 @@
|
||||
package com.fastbee.mq.redischannel.producer;
|
||||
|
||||
import com.fastbee.common.core.mq.DeviceReportBo;
|
||||
import com.fastbee.common.core.mq.DeviceStatusBo;
|
||||
import com.fastbee.common.core.mq.MQSendMessageBo;
|
||||
import com.fastbee.common.core.mq.ota.OtaUpgradeBo;
|
||||
import com.fastbee.common.core.mq.message.DeviceDownMessage;
|
||||
import com.fastbee.mq.redischannel.queue.*;
|
||||
|
||||
/**
|
||||
@@ -12,7 +8,6 @@ import com.fastbee.mq.redischannel.queue.*;
|
||||
* @author bill
|
||||
*/
|
||||
public class MessageProducer {
|
||||
|
||||
public static void sendOtherMsg(DeviceReportBo bo){
|
||||
DeviceOtherQueue.offer(bo);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user