!6 mqtt异步客户端+接收主题的业务在线程池处理

Merge pull request !6 from guanshubiao/master
This commit is contained in:
随遇而安
2022-03-22 16:16:40 +00:00
committed by Gitee
11 changed files with 368 additions and 86 deletions

View File

@@ -63,7 +63,7 @@ spring:
# 端口默认为6379
port: 6379
# 数据库索引
database: 0
database: 1
# 密码
password: admin
# 连接超时时间
@@ -80,13 +80,13 @@ spring:
max-wait: -1ms
# mqtt 配置
mqtt:
username: wumei-smart # 账号
password: wumei-smart # 密码
host-url: tcp://localhost:1883 # mqtt连接tcp地址
client-id: ${random.value} # 客户端Id不能相同采用随机数 ${random.value}
default-topic: test # 默认主题
timeout: 30000 # 超时时间
keepalive: 30 # 保持连接
username: wumei-smart # 账号
password: wumei-smart # 密码
host-url: tcp://localhost:1883 # mqtt连接tcp地址
client-id: ${random.value} # 客户端Id不能相同采用随机数 ${random.value}
default-topic: test # 默认主题
timeout: 60000 # 超时时间
keepalive: 30 # 保持连接
# token配置
token:

View File

@@ -41,6 +41,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@@ -79,6 +80,7 @@ public class ToolController extends BaseController {
@Autowired
private ThingsModelServiceImpl thingsModelService;
@Lazy
@Autowired
private EmqxService emqxService;

View File

@@ -5,6 +5,7 @@ import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
/**
@@ -16,20 +17,16 @@ public class EmqxCallback implements MqttCallbackExtended {
private static final Logger logger = LoggerFactory.getLogger(EmqxCallback.class);
@Autowired
private MqttConfig mqttConfig;
private EmqxClient emqxClient;
@Lazy
@Autowired
private EmqxService emqxService;
@Override
public void connectionLost(Throwable throwable) {
try {
logger.info("mqtt断开连接--");
//EmqxService.client 添加配置 options.setAutomaticReconnect(true);会自动重连
} catch (Exception e) {
// e.printStackTrace();
logger.error("发生错误:"+e.getMessage());
}
}
@@ -55,7 +52,12 @@ public class EmqxCallback implements MqttCallbackExtended {
*/
@Override
public void connectComplete(boolean reconnect, String serverURI) {
logger.info("mqtt已经重新连接!!");
logger.info("mqtt已经连接");
//连接后,可以在此做初始化事件,或订阅
try {
emqxService.subscribe(EmqxClient.client);
} catch (MqttException e) {
logger.error("======>>>>>订阅主题失败 error={}",e.getMessage());
}
}
}

View File

@@ -1,5 +1,6 @@
package com.ruoyi.iot.mqtt;
import com.ruoyi.common.exception.ServiceException;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
@@ -11,7 +12,7 @@ import org.springframework.stereotype.Component;
* @Classname EmqxClient
* @Description mqtt推送客户端
*/
@Component
//@Component
public class EmqxClient {
private static final Logger logger = LoggerFactory.getLogger(EmqxClient.class);
@@ -21,61 +22,137 @@ public class EmqxClient {
@Autowired
private EmqxService emqxService;
public static MqttClient client;
/**MQTT异步客户端*/
public static MqttAsyncClient client;
/**连接配置*/
private MqttConnectOptions options;
/**服务器地址url*/
private String hostname;
/**超时时间*/
private int timeout;
/**包活时间*/
private int keepalive;
/**客户端唯一ID*/
private String clientId;
/**用户名*/
private String username;
/**密码*/
private String password;
/**是否清除会话*/
private boolean clearSession;
public EmqxClient(String clientId,String username,String password,String hostname,
int timeout,int keepalive,boolean clearSession){
this.clientId = clientId;
this.username = username;
this.password = password;
this.hostname = hostname;
this.timeout = timeout;
this.keepalive = keepalive;
this.clearSession = clearSession;
}
/**
* 客户端连接
*
* @param host ip+端口
* @param clientID 客户端Id
* @param username 用户名
* @param password 密码
* @param timeout 超时时间
* @param keepalive 保留数
* 连接MQTT服务器
*/
public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {
MqttClient client;
try {
client = new MqttClient(host, clientID, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
//设置自动重新连接
options.setAutomaticReconnect(true);
/*设置为false断开连接后不清除session重连后仍然是之前的session
保留订阅的主题,能接受到离线期间的消息*/
options.setCleanSession(false);
EmqxClient.client=client;
client.setCallback(emqxCallback);
clientConnect(options,client);
} catch (Exception e) {
e.printStackTrace();
public synchronized void connect(){
/*设置配置*/
if (options == null){
setOptions();
}
if (client == null){
createClient();
}
while (!client.isConnected()){
try {
IMqttToken token = client.connect(options);
token.waitForCompletion();
}catch (Exception e){
logger.error("=====>>>>>mqtt连接失败 message={}",e.getMessage());
throw new ServiceException("mqtt客户端连接错误"+e.getMessage());
}
}
}
/**
* 10秒重连一次
* @param options
* @param client
* @throws InterruptedException
* 创建客户端
*/
public void clientConnect(MqttConnectOptions options, MqttClient client) throws InterruptedException {
try {
client.connect(options);
logger.info("mqtt连接成功");
// 订阅主题
emqxService.subscribe(client);
} catch (Exception e) {
logger.error("mqtt连接失败,"+e.getMessage());
//发生错误后重新连接
Thread.sleep(10000);
clientConnect(options,client);
private void createClient(){
if (client == null){
try {
/*host为主机名clientId是连接MQTT的客户端IDMemoryPersistence设置clientId的保存方式
默认是以内存方式保存*/
client = new MqttAsyncClient(hostname,clientId,new MemoryPersistence());
//设置回调函数
client.setCallback(emqxCallback);
logger.debug("====>>>mqtt客户端启动成功");
}catch (MqttException e){
logger.error("mqtt客户端连接错误 error={}",e.getMessage());
throw new ServiceException("mqtt客户端连接错误"+e.getMessage());
}
}
}
/**
* 设置连接属性
*/
private void setOptions(){
if (options != null){
options = null;
}
options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
//设置自动重新连接
options.setAutomaticReconnect(true);
/*设置为false断开连接不清除session重连后还是原来的session
保留订阅的主题,能接收离线期间的消息*/
options.setCleanSession(clearSession);
logger.debug("====>>>>设置mqtt参数成功");
}
/**
* 断开与mqtt的连接
*/
public synchronized void disconnect(){
//判断客户端是否null 是否连接
if (client != null && client.isConnected()){
try {
IMqttToken token = client.disconnect();
token.waitForCompletion();
}catch (MqttException e){
logger.error("====>>>>断开mqtt连接发生错误 message={}",e.getMessage());
throw new ServiceException("断开mqtt连接发生错误" + e.getMessage());
}
}
client = null;
}
/**
* 重新连接MQTT
*/
public synchronized void refresh(){
disconnect();
setOptions();
createClient();
connect();
}
/**
* 发布
* @param qos 连接方式
@@ -89,32 +166,39 @@ public class EmqxClient {
message.setQos(qos);
message.setRetained(retained);
message.setPayload(pushMessage.getBytes());
MqttTopic mTopic = EmqxClient.client.getTopic(topic);
if (null == mTopic) {
logger.error("topic not exist");
}
MqttDeliveryToken token;
try {
token = mTopic.publish(message);
IMqttDeliveryToken token = client.publish(topic,message);
token.waitForCompletion();
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
logger.error("=======>>>>>发布主题时发生错误 topic={},message={}",topic,e.getMessage());
}
}
/**
* 订阅某个主题
* @param topic 主题
* @param qos 连接方式
* @param qos 消息质量
* Qos1消息发送一次不确保
* Qos2至少分发一次服务器确保接收消息进行确认
* Qos3只分发一次确保消息送达和只传递一次
*/
public void subscribe(String topic, int qos) {
logger.info("订阅主题" + topic);
public void subscribe(String topic, int qos){
logger.info("=======>>>>>订阅主题 topic={}",topic);
try {
EmqxClient.client.subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
IMqttToken token = client.subscribe(topic, qos);
token.waitForCompletion();
}catch (MqttException e){
logger.error("=======>>>>>订阅主题 topic={} 失败 message={}",topic,e.getMessage());
}
}
/**是否处于连接状态*/
public boolean isConnected(){
return client != null && client.isConnected();
}
public String getClientId() {return clientId;};
}

View File

@@ -10,12 +10,14 @@ import com.ruoyi.iot.model.ThingsModels.ThingsModelValueRemarkItem;
import com.ruoyi.iot.model.ThingsModels.ThingsModelValuesInput;
import com.ruoyi.iot.service.IDeviceLogService;
import com.ruoyi.iot.service.IDeviceService;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.List;
@@ -53,7 +55,7 @@ public class EmqxService {
String pPropertyTopic = "/property/get";
String pFunctionTopic = "/function/get";
public void subscribe(MqttClient client) throws MqttException {
public void subscribe(MqttAsyncClient client) throws MqttException {
// 订阅设备信息
client.subscribe(sInfoTopic, 1);
// 订阅时钟同步
@@ -71,7 +73,18 @@ public class EmqxService {
logger.info("mqtt订阅了设备信息和物模型主题");
}
public void subscribeCallback(String topic, MqttMessage mqttMessage) {
/**
* 消息回调方法
* @param topic 主题
* @param mqttMessage 消息体
*/
@Async
public void subscribeCallback(String topic, MqttMessage mqttMessage) throws InterruptedException {
/**测试线程池使用*/
logger.info("====>>>>线程名--{}",Thread.currentThread().getName());
/**模拟耗时操作*/
Thread.sleep(1000);
// subscribe后得到的消息会执行到这里面
String message = new String(mqttMessage.getPayload());
logger.info("接收消息主题 : " + topic);

View File

@@ -1,5 +1,6 @@
package com.ruoyi.iot.mqtt;
import com.alibaba.fastjson.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
@@ -14,10 +15,29 @@ import org.springframework.stereotype.Component;
public class EmqxStart implements ApplicationRunner {
@Autowired
private MqttConfig mqttConfig;
private EmqxClient emqxClient;
@Override
public void run(ApplicationArguments applicationArguments){
mqttConfig.EmqxClientStart();
public void run(ApplicationArguments applicationArguments) throws Exception{
emqxClient.connect();
/*模拟客户端发布主题,测试多线程处理订阅主题业务*/
/** JSONObject message = new JSONObject();
message.put("id",2);
message.put("value","hello");
message.put("remark","12");
Thread.sleep(10000);
new Thread(() ->{
for (int i = 0; i < 3000; i++) {
emqxClient.publish(0,false,"/2/D6329VL54419L1Y0/info/post",message.toJSONString());
//测试多线程性能
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
*/
}
}

View File

@@ -0,0 +1,25 @@
package com.ruoyi.iot.mqtt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqttClientConfiguration {
@Autowired
private MqttConfig mqttConfig;
@Bean
EmqxClient mqttClient(){
EmqxClient client = new EmqxClient(mqttConfig.getclientId(),
mqttConfig.getusername(),
mqttConfig.getpassword(),
mqttConfig.gethostUrl(),
mqttConfig.gettimeout(),
mqttConfig.getkeepalive(),
mqttConfig.isClearSession()
);
return client;
}
}

View File

@@ -18,9 +18,6 @@ public class MqttConfig {
private static final Logger logger = LoggerFactory.getLogger(MqttConfig.class);
@Autowired
private EmqxClient emqxClient;
/**
* 用户名
*/
@@ -50,6 +47,13 @@ public class MqttConfig {
*/
private int keepalive;
/**是否清除session*/
private boolean clearSession;
/**是否共享订阅*/
private boolean isShared;
/**分组共享订阅*/
private boolean isSharedGroup;
public String getusername()
{
@@ -93,9 +97,27 @@ public class MqttConfig {
}
public void setkeepalive(int keepalive) {this.keepalive = keepalive;}
// @Bean
public void EmqxClientStart() {
logger.info("mqtt启动中...");
emqxClient.connect(gethostUrl(), getclientId(), getusername(), getpassword(), gettimeout(), getkeepalive());
public boolean isClearSession() {
return clearSession;
}
public void setClearSession(boolean clearSession) {
this.clearSession = clearSession;
}
public boolean isShared() {
return isShared;
}
public void setShared(boolean shared) {
isShared = shared;
}
public boolean isSharedGroup() {
return isSharedGroup;
}
public void setSharedGroup(boolean sharedGroup) {
isSharedGroup = sharedGroup;
}
}

View File

@@ -0,0 +1,22 @@
package com.ruoyi.iot.task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
@Component
public class IotAsyncExceptionHander implements AsyncUncaughtExceptionHandler {
private static final Logger logger = LoggerFactory.getLogger(IotAsyncExceptionHander.class);
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
logger.debug("=======>>>>>捕获线程异常信息");
logger.error("======>>>>>> error message-{},method-name",ex.getMessage(),method.getName());
}
}

View File

@@ -0,0 +1,51 @@
package com.ruoyi.iot.task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@ComponentScan("com.ruoyi.iot")
//注解开启异步任务支持
@EnableAsync
public class TaskConfigurer implements AsyncConfigurer {
private static final Logger logger = LoggerFactory.getLogger(TaskConfigurer.class);
@Autowired
private TaskExecutorConfig config;
@Bean
TaskExecutorConfig config(){
return new TaskExecutorConfig();
}
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(config.getCoreSize());
taskExecutor.setMaxPoolSize(config.getMaxSize());
taskExecutor.setQueueCapacity(config.getQueueCapacity());
taskExecutor.setThreadNamePrefix("iot-mqtt-");
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
taskExecutor.initialize(); //数据初始化
logger.debug("=====>>>>taskExecutor init");
return taskExecutor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return AsyncConfigurer.super.getAsyncUncaughtExceptionHandler();
}
}

View File

@@ -0,0 +1,41 @@
package com.ruoyi.iot.task;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Component
@ConfigurationProperties(prefix = "spring.task.execution.pool")
public class TaskExecutorConfig {
private int coreSize;
private int maxSize;
private int queueCapacity;
public int getCoreSize() {
return coreSize;
}
public void setCoreSize(int coreSize) {
this.coreSize = coreSize;
}
public int getMaxSize() {
return maxSize;
}
public void setMaxSize(int maxSize) {
this.maxSize = maxSize;
}
public int getQueueCapacity() {
return queueCapacity;
}
public void setQueueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity;
}
}