mirror of
https://gitee.com/beecue/fastbee.git
synced 2025-12-19 17:35:54 +08:00
发布v1.1版本
This commit is contained in:
@@ -0,0 +1,54 @@
|
||||
package com.ruoyi.iot.mqtt;
|
||||
|
||||
import com.ruoyi.framework.web.domain.server.Sys;
|
||||
import org.eclipse.paho.client.mqttv3.*;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @Classname MqttCallback
|
||||
* @Description 消费监听类
|
||||
*/
|
||||
@Component
|
||||
public class EmqxCallback implements MqttCallback {
|
||||
private static final Logger logger = LoggerFactory.getLogger(EmqxCallback.class);
|
||||
|
||||
@Autowired
|
||||
private MqttConfig mqttConfig;
|
||||
|
||||
@Autowired
|
||||
private EmqxService emqxService;
|
||||
|
||||
@Override
|
||||
public void connectionLost(Throwable throwable) {
|
||||
try {
|
||||
// 重连mqtt
|
||||
while(true) {
|
||||
logger.info("mqtt连接断开,重新连接中...");
|
||||
EmqxClient.client.reconnect();
|
||||
Thread.sleep(10000);
|
||||
if(EmqxClient.client.isConnected()){
|
||||
logger.info("mqtt已经重新连接");
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (MqttException | InterruptedException e) {
|
||||
// e.printStackTrace();
|
||||
logger.error("发生错误:"+e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
|
||||
emqxService.subscribeCallback(topic,mqttMessage);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
|
||||
// 消息到达 MQTT 代理时触发的事件
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,115 @@
|
||||
package com.ruoyi.iot.mqtt;
|
||||
|
||||
import org.eclipse.paho.client.mqttv3.*;
|
||||
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @Classname EmqxClient
|
||||
* @Description mqtt推送客户端
|
||||
*/
|
||||
@Component
|
||||
public class EmqxClient {
|
||||
private static final Logger logger = LoggerFactory.getLogger(EmqxClient.class);
|
||||
|
||||
@Autowired
|
||||
private EmqxCallback emqxCallback;
|
||||
|
||||
@Autowired
|
||||
private EmqxService emqxService;
|
||||
|
||||
public static MqttClient client;
|
||||
|
||||
/**
|
||||
* 客户端连接
|
||||
*
|
||||
* @param host ip+端口
|
||||
* @param clientID 客户端Id
|
||||
* @param username 用户名
|
||||
* @param password 密码
|
||||
* @param timeout 超时时间
|
||||
* @param keepalive 保留数
|
||||
*/
|
||||
public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {
|
||||
MqttClient client;
|
||||
try {
|
||||
client = new MqttClient(host, clientID, new MemoryPersistence());
|
||||
MqttConnectOptions options = new MqttConnectOptions();
|
||||
options.setCleanSession(true);
|
||||
options.setUserName(username);
|
||||
options.setPassword(password.toCharArray());
|
||||
options.setConnectionTimeout(timeout);
|
||||
options.setKeepAliveInterval(keepalive);
|
||||
EmqxClient.client=client;
|
||||
client.setCallback(emqxCallback);
|
||||
clientConnect(options,client);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 10秒重连一次
|
||||
* @param options
|
||||
* @param client
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public void clientConnect(MqttConnectOptions options, MqttClient client) throws InterruptedException {
|
||||
try {
|
||||
client.connect(options);
|
||||
logger.info("mqtt连接成功");
|
||||
// 订阅主题
|
||||
emqxService.subscribe(client);
|
||||
} catch (Exception e) {
|
||||
logger.error("mqtt连接失败,"+e.getMessage());
|
||||
//发生错误后重新连接
|
||||
Thread.sleep(10000);
|
||||
clientConnect(options,client);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布
|
||||
* @param qos 连接方式
|
||||
* @param retained 是否保留
|
||||
* @param topic 主题
|
||||
* @param pushMessage 消息体
|
||||
*/
|
||||
public void publish(int qos, boolean retained, String topic, String pushMessage) {
|
||||
logger.info("发布主题" + topic);
|
||||
MqttMessage message = new MqttMessage();
|
||||
message.setQos(qos);
|
||||
message.setRetained(retained);
|
||||
message.setPayload(pushMessage.getBytes());
|
||||
MqttTopic mTopic = EmqxClient.client.getTopic(topic);
|
||||
if (null == mTopic) {
|
||||
logger.error("topic not exist");
|
||||
}
|
||||
MqttDeliveryToken token;
|
||||
try {
|
||||
token = mTopic.publish(message);
|
||||
token.waitForCompletion();
|
||||
} catch (MqttPersistenceException e) {
|
||||
e.printStackTrace();
|
||||
} catch (MqttException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 订阅某个主题
|
||||
* @param topic 主题
|
||||
* @param qos 连接方式
|
||||
*/
|
||||
public void subscribe(String topic, int qos) {
|
||||
logger.info("订阅主题" + topic);
|
||||
try {
|
||||
EmqxClient.client.subscribe(topic, qos);
|
||||
} catch (MqttException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,211 @@
|
||||
package com.ruoyi.iot.mqtt;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.ruoyi.iot.domain.Device;
|
||||
import com.ruoyi.iot.domain.DeviceLog;
|
||||
import com.ruoyi.iot.model.NtpModel;
|
||||
import com.ruoyi.iot.model.ThingsModels.IdentityAndName;
|
||||
import com.ruoyi.iot.model.ThingsModels.ThingsModelValueItem;
|
||||
import com.ruoyi.iot.model.ThingsModels.ThingsModelValueRemarkItem;
|
||||
import com.ruoyi.iot.model.ThingsModels.ThingsModelValuesInput;
|
||||
import com.ruoyi.iot.service.IDeviceLogService;
|
||||
import com.ruoyi.iot.service.IDeviceService;
|
||||
import org.eclipse.paho.client.mqttv3.MqttClient;
|
||||
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Service
|
||||
public class EmqxService {
|
||||
private static final Logger logger = LoggerFactory.getLogger(EmqxService.class);
|
||||
|
||||
@Autowired
|
||||
private EmqxClient emqxClient;
|
||||
|
||||
@Autowired
|
||||
private IDeviceService deviceService;
|
||||
|
||||
@Autowired
|
||||
private IDeviceLogService deviceLogService;
|
||||
|
||||
/** 订阅的主题 */
|
||||
private static final String prefix="/+/+/";
|
||||
String sInfoTopic = prefix + "info/post";
|
||||
String sNtpTopic = prefix + "ntp/post";
|
||||
String sPropertyTopic = prefix + "property/post";
|
||||
String sFunctionTopic = prefix + "function/post";
|
||||
String sEventTopic = prefix + "event/post";
|
||||
String sShadowPropertyTopic = prefix + "property-offline/post";
|
||||
String sShadowFunctionTopic = prefix + "function-offline/post";
|
||||
|
||||
/** 发布的主题 */
|
||||
String pStatusTopic = "/status/post";
|
||||
String pNtpTopic = "/ntp/get";
|
||||
String pPropertyTopic = "/property/get";
|
||||
String pFunctionTopic = "/function/get";
|
||||
|
||||
public void subscribe(MqttClient client) throws MqttException {
|
||||
// 订阅设备信息
|
||||
client.subscribe(sInfoTopic, 1);
|
||||
// 订阅时钟同步
|
||||
client.subscribe(sNtpTopic, 1);
|
||||
// 订阅设备属性
|
||||
client.subscribe(sPropertyTopic,1);
|
||||
// 订阅设备功能
|
||||
client.subscribe(sFunctionTopic,1);
|
||||
// 订阅设备事件
|
||||
client.subscribe(sEventTopic,1);
|
||||
// 订阅属性(影子模式)
|
||||
client.subscribe(sShadowPropertyTopic,1);
|
||||
// 订阅功能(影子模式)
|
||||
client.subscribe(sShadowFunctionTopic,1);
|
||||
logger.info("mqtt订阅了设备信息和物模型主题");
|
||||
}
|
||||
|
||||
public void subscribeCallback(String topic, MqttMessage mqttMessage){
|
||||
// subscribe后得到的消息会执行到这里面
|
||||
String message=new String(mqttMessage.getPayload());
|
||||
logger.info("接收消息主题 : " + topic);
|
||||
logger.info("接收消息Qos : " + mqttMessage.getQos());
|
||||
logger.info("接收消息内容 : " + message);
|
||||
|
||||
String[] topicItem=topic.substring(1).split("/");
|
||||
Long productId= Long.valueOf(topicItem[0]);
|
||||
String deviceNum=topicItem[1];
|
||||
String name=topicItem[2];
|
||||
switch (name){
|
||||
case "info":
|
||||
reportDevice(productId,deviceNum,message);
|
||||
break;
|
||||
case "ntp":
|
||||
publishNtp(productId,deviceNum,message);
|
||||
break;
|
||||
case "property":
|
||||
reportProperty(productId,deviceNum,message,false);
|
||||
break;
|
||||
case "function":
|
||||
reportFunction(productId,deviceNum,message,false);
|
||||
break;
|
||||
case "event":
|
||||
reportEvent(productId,deviceNum,message);
|
||||
break;
|
||||
case "property-offline":
|
||||
reportProperty(productId,deviceNum,message,true);
|
||||
break;
|
||||
case "function-offline":
|
||||
reportFunction(productId,deviceNum,message,true);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 上报设备信息
|
||||
*/
|
||||
private void reportDevice(Long productId,String deviceNum,String message){
|
||||
Device device=JSON.parseObject(message,Device.class);
|
||||
device.setProductId(productId);
|
||||
device.setSerialNumber(deviceNum);
|
||||
deviceService.reportDevice(device);
|
||||
}
|
||||
|
||||
/**
|
||||
* 上报属性
|
||||
* @param message
|
||||
*/
|
||||
private void reportProperty(Long productId,String deviceNum,String message,boolean isShadow){
|
||||
List<ThingsModelValueRemarkItem> thingsModelValueRemarkItems=JSON.parseArray(message, ThingsModelValueRemarkItem.class);
|
||||
ThingsModelValuesInput input=new ThingsModelValuesInput();
|
||||
input.setProductId(productId);
|
||||
input.setDeviceNumber(deviceNum);
|
||||
input.setThingsModelValueRemarkItem(thingsModelValueRemarkItems);
|
||||
deviceService.reportDeviceThingsModelValue(input,1,isShadow);
|
||||
}
|
||||
|
||||
/**
|
||||
* 上报功能
|
||||
* @param message
|
||||
*/
|
||||
private void reportFunction(Long productId,String deviceNum,String message,boolean isShadow){
|
||||
List<ThingsModelValueRemarkItem> thingsModelValueRemarkItems=JSON.parseArray(message, ThingsModelValueRemarkItem.class);
|
||||
ThingsModelValuesInput input=new ThingsModelValuesInput();
|
||||
input.setProductId(productId);
|
||||
input.setDeviceNumber(deviceNum);
|
||||
input.setThingsModelValueRemarkItem(thingsModelValueRemarkItems);
|
||||
deviceService.reportDeviceThingsModelValue(input,2,isShadow);
|
||||
}
|
||||
|
||||
/**
|
||||
* 上报事件
|
||||
* @param message
|
||||
*/
|
||||
private void reportEvent(Long productId,String deviceNum,String message){
|
||||
List<ThingsModelValueRemarkItem> thingsModelValueRemarkItems=JSON.parseArray(message, ThingsModelValueRemarkItem.class);
|
||||
Device device =deviceService.selectDeviceBySerialNumber(deviceNum);
|
||||
for(int i=0;i<thingsModelValueRemarkItems.size();i++) {
|
||||
// 添加到设备日志
|
||||
DeviceLog deviceLog = new DeviceLog();
|
||||
deviceLog.setDeviceId(device.getDeviceId());
|
||||
deviceLog.setDeviceName(device.getDeviceName());
|
||||
deviceLog.setLogValue(thingsModelValueRemarkItems.get(i).getValue());
|
||||
deviceLog.setRemark(thingsModelValueRemarkItems.get(i).getRemark());
|
||||
deviceLog.setSerialNumber(device.getSerialNumber());
|
||||
deviceLog.setIdentity(thingsModelValueRemarkItems.get(i).getId());
|
||||
deviceLog.setLogType(3);
|
||||
deviceLog.setIsMonitor(0);
|
||||
deviceLogService.insertDeviceLog(deviceLog);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 1.发布设备状态
|
||||
*/
|
||||
public void publishStatus(Long productId,String deviceNum,int deviceStatus,int isShadow){
|
||||
String message="{\"status\":"+deviceStatus+",\"isShadow\":"+isShadow+"}";
|
||||
emqxClient.publish(1,false,"/"+productId+"/"+deviceNum+pStatusTopic, message);
|
||||
}
|
||||
|
||||
/**
|
||||
* 2.发布时钟同步信息
|
||||
* @param message
|
||||
*/
|
||||
private void publishNtp(Long productId,String deviceNum,String message){
|
||||
NtpModel ntpModel=JSON.parseObject(message,NtpModel.class);
|
||||
ntpModel.setServerRecvTime(System.currentTimeMillis());
|
||||
ntpModel.setServerSendTime(System.currentTimeMillis());
|
||||
emqxClient.publish(1, false, "/"+productId+"/"+deviceNum+pNtpTopic, JSON.toJSONString(ntpModel));
|
||||
}
|
||||
|
||||
/**
|
||||
* 3.发布属性
|
||||
*/
|
||||
public void publishProperty(Long productId,String deviceNum,List<IdentityAndName> thingsList){
|
||||
if(thingsList==null){
|
||||
emqxClient.publish(1,true,"/"+productId+"/"+deviceNum+pPropertyTopic, "");
|
||||
}else{
|
||||
emqxClient.publish(1,true,"/"+productId+"/"+deviceNum+pPropertyTopic, JSON.toJSONString(thingsList));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 4.发布功能
|
||||
*/
|
||||
public void publishFunction(Long productId,String deviceNum,List<IdentityAndName> thingsList){
|
||||
if(thingsList==null){
|
||||
emqxClient.publish(1,true,"/"+productId+"/"+deviceNum+pFunctionTopic, "");
|
||||
}else{
|
||||
emqxClient.publish(1,true,"/"+productId+"/"+deviceNum+pFunctionTopic, JSON.toJSONString(thingsList));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,23 @@
|
||||
package com.ruoyi.iot.mqtt;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.ApplicationArguments;
|
||||
import org.springframework.boot.ApplicationRunner;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
*项目启动执行
|
||||
*/
|
||||
@Component
|
||||
@Order(value = 1) //执行顺序控制
|
||||
public class EmqxStart implements ApplicationRunner {
|
||||
|
||||
@Autowired
|
||||
private MqttConfig mqttConfig;
|
||||
|
||||
@Override
|
||||
public void run(ApplicationArguments applicationArguments){
|
||||
mqttConfig.EmqxClientStart();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,101 @@
|
||||
package com.ruoyi.iot.mqtt;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @Classname MqttConfig
|
||||
* @Description mqtt配置信息
|
||||
* @author kerwincui
|
||||
*/
|
||||
@Component
|
||||
@ConfigurationProperties("spring.mqtt")
|
||||
public class MqttConfig {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(MqttConfig.class);
|
||||
|
||||
@Autowired
|
||||
private EmqxClient emqxClient;
|
||||
|
||||
/**
|
||||
* 用户名
|
||||
*/
|
||||
private String username;
|
||||
/**
|
||||
* 密码
|
||||
*/
|
||||
private String password;
|
||||
/**
|
||||
* 连接地址
|
||||
*/
|
||||
private String hostUrl;
|
||||
/**
|
||||
* 客户Id
|
||||
*/
|
||||
private String clientId;
|
||||
/**
|
||||
* 默认连接话题
|
||||
*/
|
||||
private String defaultTopic;
|
||||
/**
|
||||
* 超时时间
|
||||
*/
|
||||
private int timeout;
|
||||
/**
|
||||
* 保持连接数
|
||||
*/
|
||||
private int keepalive;
|
||||
|
||||
|
||||
public String getusername()
|
||||
{
|
||||
return username;
|
||||
}
|
||||
public void setusername(String username) {this.username = username;}
|
||||
|
||||
public String getpassword()
|
||||
{
|
||||
return password;
|
||||
}
|
||||
public void setpassword(String password) {this.password = password;}
|
||||
|
||||
public String gethostUrl()
|
||||
{
|
||||
return hostUrl;
|
||||
}
|
||||
public void sethostUrl(String hostUrl) {this.hostUrl = hostUrl;}
|
||||
|
||||
public String getclientId()
|
||||
{
|
||||
return "server-"+clientId;
|
||||
}
|
||||
public void setclientId(String clientId) {this.clientId = clientId;}
|
||||
|
||||
public String getdefaultTopic()
|
||||
{
|
||||
return defaultTopic;
|
||||
}
|
||||
public void setdefaultTopic(String defaultTopic) {this.defaultTopic = defaultTopic;}
|
||||
|
||||
public int gettimeout()
|
||||
{
|
||||
return timeout;
|
||||
}
|
||||
public void settimeout(int timeout) {this.timeout = timeout;}
|
||||
|
||||
public int getkeepalive()
|
||||
{
|
||||
return keepalive;
|
||||
}
|
||||
public void setkeepalive(int keepalive) {this.keepalive = keepalive;}
|
||||
|
||||
// @Bean
|
||||
public void EmqxClientStart() {
|
||||
logger.info("mqtt启动中...");
|
||||
emqxClient.connect(gethostUrl(), getclientId(), getusername(), getpassword(), gettimeout(), getkeepalive());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user