diff --git a/springboot/wumei-admin/src/main/resources/application.yml b/springboot/wumei-admin/src/main/resources/application.yml index 2acb126f..609849d1 100644 --- a/springboot/wumei-admin/src/main/resources/application.yml +++ b/springboot/wumei-admin/src/main/resources/application.yml @@ -63,7 +63,7 @@ spring: # 端口,默认为6379 port: 6379 # 数据库索引 - database: 0 + database: 1 # 密码 password: admin # 连接超时时间 @@ -80,13 +80,13 @@ spring: max-wait: -1ms # mqtt 配置 mqtt: - username: wumei-smart # 账号 - password: wumei-smart # 密码 - host-url: tcp://localhost:1883 # mqtt连接tcp地址 - client-id: ${random.value} # 客户端Id,不能相同,采用随机数 ${random.value} - default-topic: test # 默认主题 - timeout: 30000 # 超时时间 - keepalive: 30 # 保持连接 + username: wumei-smart # 账号 + password: wumei-smart # 密码 + host-url: tcp://localhost:1883 # mqtt连接tcp地址 + client-id: ${random.value} # 客户端Id,不能相同,采用随机数 ${random.value} + default-topic: test # 默认主题 + timeout: 60000 # 超时时间 + keepalive: 30 # 保持连接 # token配置 token: diff --git a/springboot/wumei-iot/src/main/java/com/ruoyi/iot/controller/ToolController.java b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/controller/ToolController.java index 0c89b617..ee40d483 100644 --- a/springboot/wumei-iot/src/main/java/com/ruoyi/iot/controller/ToolController.java +++ b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/controller/ToolController.java @@ -41,6 +41,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Lazy; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; @@ -79,6 +80,7 @@ public class ToolController extends BaseController { @Autowired private ThingsModelServiceImpl thingsModelService; + @Lazy @Autowired private EmqxService emqxService; diff --git a/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxCallback.java b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxCallback.java index f958999e..b08864db 100644 --- a/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxCallback.java +++ b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxCallback.java @@ -5,6 +5,7 @@ import org.eclipse.paho.client.mqttv3.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; /** @@ -16,20 +17,16 @@ public class EmqxCallback implements MqttCallbackExtended { private static final Logger logger = LoggerFactory.getLogger(EmqxCallback.class); @Autowired - private MqttConfig mqttConfig; + private EmqxClient emqxClient; + @Lazy @Autowired private EmqxService emqxService; @Override public void connectionLost(Throwable throwable) { - try { logger.info("mqtt断开连接--"); - //EmqxService.client 添加配置 options.setAutomaticReconnect(true);会自动重连 - } catch (Exception e) { - // e.printStackTrace(); - logger.error("发生错误:"+e.getMessage()); - } + } @@ -55,7 +52,12 @@ public class EmqxCallback implements MqttCallbackExtended { */ @Override public void connectComplete(boolean reconnect, String serverURI) { - logger.info("mqtt已经重新连接!!"); + logger.info("mqtt已经连接!!"); //连接后,可以在此做初始化事件,或订阅 + try { + emqxService.subscribe(EmqxClient.client); + } catch (MqttException e) { + logger.error("======>>>>>订阅主题失败 error={}",e.getMessage()); + } } } diff --git a/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxClient.java b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxClient.java index 9aad218e..4b392174 100644 --- a/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxClient.java +++ b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxClient.java @@ -1,5 +1,6 @@ package com.ruoyi.iot.mqtt; +import com.ruoyi.common.exception.ServiceException; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; @@ -11,7 +12,7 @@ import org.springframework.stereotype.Component; * @Classname EmqxClient * @Description mqtt推送客户端 */ -@Component +//@Component public class EmqxClient { private static final Logger logger = LoggerFactory.getLogger(EmqxClient.class); @@ -21,61 +22,137 @@ public class EmqxClient { @Autowired private EmqxService emqxService; - public static MqttClient client; + /**MQTT异步客户端*/ + public static MqttAsyncClient client; + + /**连接配置*/ + private MqttConnectOptions options; + + /**服务器地址url*/ + private String hostname; + + /**超时时间*/ + private int timeout; + + /**包活时间*/ + private int keepalive; + + /**客户端唯一ID*/ + private String clientId; + + /**用户名*/ + private String username; + + /**密码*/ + private String password; + + /**是否清除会话*/ + private boolean clearSession; + + public EmqxClient(String clientId,String username,String password,String hostname, + int timeout,int keepalive,boolean clearSession){ + this.clientId = clientId; + this.username = username; + this.password = password; + this.hostname = hostname; + this.timeout = timeout; + this.keepalive = keepalive; + this.clearSession = clearSession; + } + /** - * 客户端连接 - * - * @param host ip+端口 - * @param clientID 客户端Id - * @param username 用户名 - * @param password 密码 - * @param timeout 超时时间 - * @param keepalive 保留数 + * 连接MQTT服务器 */ - public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) { - MqttClient client; - try { - client = new MqttClient(host, clientID, new MemoryPersistence()); - MqttConnectOptions options = new MqttConnectOptions(); - options.setCleanSession(true); - options.setUserName(username); - options.setPassword(password.toCharArray()); - options.setConnectionTimeout(timeout); - options.setKeepAliveInterval(keepalive); - //设置自动重新连接 - options.setAutomaticReconnect(true); - /*设置为false,断开连接后,不清除session,重连后仍然是之前的session, - 保留订阅的主题,能接受到离线期间的消息*/ - options.setCleanSession(false); - EmqxClient.client=client; - client.setCallback(emqxCallback); - clientConnect(options,client); - } catch (Exception e) { - e.printStackTrace(); + public synchronized void connect(){ + + /*设置配置*/ + if (options == null){ + setOptions(); + } + if (client == null){ + createClient(); + } + while (!client.isConnected()){ + try { + IMqttToken token = client.connect(options); + token.waitForCompletion(); + }catch (Exception e){ + logger.error("=====>>>>>mqtt连接失败 message={}",e.getMessage()); + throw new ServiceException("mqtt客户端连接错误"+e.getMessage()); + } } } /** - * 10秒重连一次 - * @param options - * @param client - * @throws InterruptedException + * 创建客户端 */ - public void clientConnect(MqttConnectOptions options, MqttClient client) throws InterruptedException { - try { - client.connect(options); - logger.info("mqtt连接成功"); - // 订阅主题 - emqxService.subscribe(client); - } catch (Exception e) { - logger.error("mqtt连接失败,"+e.getMessage()); - //发生错误后重新连接 - Thread.sleep(10000); - clientConnect(options,client); + private void createClient(){ + if (client == null){ + try { + /*host为主机名,clientId是连接MQTT的客户端ID,MemoryPersistence设置clientId的保存方式 + 默认是以内存方式保存*/ + client = new MqttAsyncClient(hostname,clientId,new MemoryPersistence()); + //设置回调函数 + client.setCallback(emqxCallback); + logger.debug("====>>>mqtt客户端启动成功"); + }catch (MqttException e){ + logger.error("mqtt客户端连接错误 error={}",e.getMessage()); + throw new ServiceException("mqtt客户端连接错误"+e.getMessage()); + } } } + /** + * 设置连接属性 + */ + private void setOptions(){ + if (options != null){ + options = null; + } + options = new MqttConnectOptions(); + options.setUserName(username); + options.setPassword(password.toCharArray()); + options.setConnectionTimeout(timeout); + options.setKeepAliveInterval(keepalive); + //设置自动重新连接 + options.setAutomaticReconnect(true); + /*设置为false,断开连接,不清除session,重连后还是原来的session + 保留订阅的主题,能接收离线期间的消息*/ + options.setCleanSession(clearSession); + logger.debug("====>>>>设置mqtt参数成功"); + } + + /** + * 断开与mqtt的连接 + */ + public synchronized void disconnect(){ + //判断客户端是否null 是否连接 + if (client != null && client.isConnected()){ + try { + IMqttToken token = client.disconnect(); + token.waitForCompletion(); + }catch (MqttException e){ + logger.error("====>>>>断开mqtt连接发生错误 message={}",e.getMessage()); + throw new ServiceException("断开mqtt连接发生错误" + e.getMessage()); + } + } + client = null; + } + + /** + * 重新连接MQTT + */ + public synchronized void refresh(){ + disconnect(); + setOptions(); + createClient(); + connect(); + } + + + + /** * 发布 * @param qos 连接方式 @@ -89,32 +166,39 @@ public class EmqxClient { message.setQos(qos); message.setRetained(retained); message.setPayload(pushMessage.getBytes()); - MqttTopic mTopic = EmqxClient.client.getTopic(topic); - if (null == mTopic) { - logger.error("topic not exist"); - } - MqttDeliveryToken token; + try { - token = mTopic.publish(message); + IMqttDeliveryToken token = client.publish(topic,message); token.waitForCompletion(); } catch (MqttPersistenceException e) { e.printStackTrace(); } catch (MqttException e) { - e.printStackTrace(); + logger.error("=======>>>>>发布主题时发生错误 topic={},message={}",topic,e.getMessage()); } } /** * 订阅某个主题 * @param topic 主题 - * @param qos 连接方式 + * @param qos 消息质量 + * Qos1:消息发送一次,不确保 + * Qos2:至少分发一次,服务器确保接收消息进行确认 + * Qos3:只分发一次,确保消息送达和只传递一次 */ - public void subscribe(String topic, int qos) { - logger.info("订阅主题" + topic); + public void subscribe(String topic, int qos){ + logger.info("=======>>>>>订阅了主题 topic={}",topic); try { - EmqxClient.client.subscribe(topic, qos); - } catch (MqttException e) { - e.printStackTrace(); + IMqttToken token = client.subscribe(topic, qos); + token.waitForCompletion(); + }catch (MqttException e){ + logger.error("=======>>>>>订阅主题 topic={} 失败 message={}",topic,e.getMessage()); } } + + /**是否处于连接状态*/ + public boolean isConnected(){ + return client != null && client.isConnected(); + } + + public String getClientId() {return clientId;}; } \ No newline at end of file diff --git a/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxService.java b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxService.java index f6b2e21a..25cd4098 100644 --- a/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxService.java +++ b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxService.java @@ -10,12 +10,14 @@ import com.ruoyi.iot.model.ThingsModels.ThingsModelValueRemarkItem; import com.ruoyi.iot.model.ThingsModels.ThingsModelValuesInput; import com.ruoyi.iot.service.IDeviceLogService; import com.ruoyi.iot.service.IDeviceService; +import org.eclipse.paho.client.mqttv3.MqttAsyncClient; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.util.List; @@ -53,7 +55,7 @@ public class EmqxService { String pPropertyTopic = "/property/get"; String pFunctionTopic = "/function/get"; - public void subscribe(MqttClient client) throws MqttException { + public void subscribe(MqttAsyncClient client) throws MqttException { // 订阅设备信息 client.subscribe(sInfoTopic, 1); // 订阅时钟同步 @@ -71,7 +73,18 @@ public class EmqxService { logger.info("mqtt订阅了设备信息和物模型主题"); } - public void subscribeCallback(String topic, MqttMessage mqttMessage) { + /** + * 消息回调方法 + * @param topic 主题 + * @param mqttMessage 消息体 + */ + @Async + public void subscribeCallback(String topic, MqttMessage mqttMessage) throws InterruptedException { + + /**测试线程池使用*/ + logger.info("====>>>>线程名--{}",Thread.currentThread().getName()); + /**模拟耗时操作*/ + Thread.sleep(1000); // subscribe后得到的消息会执行到这里面 String message = new String(mqttMessage.getPayload()); logger.info("接收消息主题 : " + topic); diff --git a/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxStart.java b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxStart.java index b9a30af9..fd2f2d0e 100644 --- a/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxStart.java +++ b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/EmqxStart.java @@ -1,5 +1,6 @@ package com.ruoyi.iot.mqtt; +import com.alibaba.fastjson.JSONObject; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; @@ -14,10 +15,29 @@ import org.springframework.stereotype.Component; public class EmqxStart implements ApplicationRunner { @Autowired - private MqttConfig mqttConfig; + private EmqxClient emqxClient; @Override - public void run(ApplicationArguments applicationArguments){ - mqttConfig.EmqxClientStart(); + public void run(ApplicationArguments applicationArguments) throws Exception{ + emqxClient.connect(); + + /*模拟客户端发布主题,测试多线程处理订阅主题业务*/ + /** JSONObject message = new JSONObject(); + message.put("id",2); + message.put("value","hello"); + message.put("remark","12"); + Thread.sleep(10000); + new Thread(() ->{ + for (int i = 0; i < 3000; i++) { + emqxClient.publish(0,false,"/2/D6329VL54419L1Y0/info/post",message.toJSONString()); + //测试多线程性能 + try { + Thread.sleep(300); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }).start(); + */ } } diff --git a/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/MqttClientConfiguration.java b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/MqttClientConfiguration.java new file mode 100644 index 00000000..25e3dbe1 --- /dev/null +++ b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/MqttClientConfiguration.java @@ -0,0 +1,25 @@ +package com.ruoyi.iot.mqtt; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class MqttClientConfiguration { + + @Autowired + private MqttConfig mqttConfig; + + @Bean + EmqxClient mqttClient(){ + EmqxClient client = new EmqxClient(mqttConfig.getclientId(), + mqttConfig.getusername(), + mqttConfig.getpassword(), + mqttConfig.gethostUrl(), + mqttConfig.gettimeout(), + mqttConfig.getkeepalive(), + mqttConfig.isClearSession() + ); + return client; + } +} diff --git a/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/MqttConfig.java b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/MqttConfig.java index cb614bd3..1542ef7d 100644 --- a/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/MqttConfig.java +++ b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/mqtt/MqttConfig.java @@ -18,9 +18,6 @@ public class MqttConfig { private static final Logger logger = LoggerFactory.getLogger(MqttConfig.class); - @Autowired - private EmqxClient emqxClient; - /** * 用户名 */ @@ -50,6 +47,13 @@ public class MqttConfig { */ private int keepalive; + /**是否清除session*/ + private boolean clearSession; + /**是否共享订阅*/ + private boolean isShared; + /**分组共享订阅*/ + private boolean isSharedGroup; + public String getusername() { @@ -93,9 +97,27 @@ public class MqttConfig { } public void setkeepalive(int keepalive) {this.keepalive = keepalive;} - // @Bean - public void EmqxClientStart() { - logger.info("mqtt启动中..."); - emqxClient.connect(gethostUrl(), getclientId(), getusername(), getpassword(), gettimeout(), getkeepalive()); + public boolean isClearSession() { + return clearSession; + } + + public void setClearSession(boolean clearSession) { + this.clearSession = clearSession; + } + + public boolean isShared() { + return isShared; + } + + public void setShared(boolean shared) { + isShared = shared; + } + + public boolean isSharedGroup() { + return isSharedGroup; + } + + public void setSharedGroup(boolean sharedGroup) { + isSharedGroup = sharedGroup; } } diff --git a/springboot/wumei-iot/src/main/java/com/ruoyi/iot/task/IotAsyncExceptionHander.java b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/task/IotAsyncExceptionHander.java new file mode 100644 index 00000000..5073bee0 --- /dev/null +++ b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/task/IotAsyncExceptionHander.java @@ -0,0 +1,22 @@ +package com.ruoyi.iot.task; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; +import org.springframework.stereotype.Component; + +import java.lang.reflect.Method; + + +@Component +public class IotAsyncExceptionHander implements AsyncUncaughtExceptionHandler { + + private static final Logger logger = LoggerFactory.getLogger(IotAsyncExceptionHander.class); + + @Override + public void handleUncaughtException(Throwable ex, Method method, Object... params) { + logger.debug("=======>>>>>捕获线程异常信息"); + logger.error("======>>>>>> error message-{},method-name",ex.getMessage(),method.getName()); + + } +} diff --git a/springboot/wumei-iot/src/main/java/com/ruoyi/iot/task/TaskConfigurer.java b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/task/TaskConfigurer.java new file mode 100644 index 00000000..602dfcce --- /dev/null +++ b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/task/TaskConfigurer.java @@ -0,0 +1,51 @@ +package com.ruoyi.iot.task; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.AsyncConfigurer; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +@Configuration +@ComponentScan("com.ruoyi.iot") +//注解开启异步任务支持 +@EnableAsync +public class TaskConfigurer implements AsyncConfigurer { + + private static final Logger logger = LoggerFactory.getLogger(TaskConfigurer.class); + + @Autowired + private TaskExecutorConfig config; + + @Bean + TaskExecutorConfig config(){ + return new TaskExecutorConfig(); + } + + @Override + public Executor getAsyncExecutor() { + + ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); + taskExecutor.setCorePoolSize(config.getCoreSize()); + taskExecutor.setMaxPoolSize(config.getMaxSize()); + taskExecutor.setQueueCapacity(config.getQueueCapacity()); + taskExecutor.setThreadNamePrefix("iot-mqtt-"); + taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); + taskExecutor.initialize(); //数据初始化 + logger.debug("=====>>>>taskExecutor init"); + return taskExecutor; + } + + @Override + public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { + return AsyncConfigurer.super.getAsyncUncaughtExceptionHandler(); + } +} diff --git a/springboot/wumei-iot/src/main/java/com/ruoyi/iot/task/TaskExecutorConfig.java b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/task/TaskExecutorConfig.java new file mode 100644 index 00000000..bd971d1a --- /dev/null +++ b/springboot/wumei-iot/src/main/java/com/ruoyi/iot/task/TaskExecutorConfig.java @@ -0,0 +1,41 @@ +package com.ruoyi.iot.task; + + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +@Component +@ConfigurationProperties(prefix = "spring.task.execution.pool") +public class TaskExecutorConfig { + + + private int coreSize; + + private int maxSize; + + private int queueCapacity; + + public int getCoreSize() { + return coreSize; + } + + public void setCoreSize(int coreSize) { + this.coreSize = coreSize; + } + + public int getMaxSize() { + return maxSize; + } + + public void setMaxSize(int maxSize) { + this.maxSize = maxSize; + } + + public int getQueueCapacity() { + return queueCapacity; + } + + public void setQueueCapacity(int queueCapacity) { + this.queueCapacity = queueCapacity; + } +}