1.添加异步客户端MqttAsyncClient,创建client加锁,确保client唯一

2.添加线程池处理接收主题的业务事件
This commit is contained in:
guanshubiao
2022-03-22 16:06:36 +08:00
parent 36f98dc3cb
commit 940a9eda63
11 changed files with 370 additions and 78 deletions

View File

@@ -0,0 +1,22 @@
package com.ruoyi.iot.task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
@Component
public class IotAsyncExceptionHander implements AsyncUncaughtExceptionHandler {
private static final Logger logger = LoggerFactory.getLogger(IotAsyncExceptionHander.class);
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
logger.debug("=======>>>>>捕获线程异常信息");
logger.error("======>>>>>> error message-{},method-name",ex.getMessage(),method.getName());
}
}

View File

@@ -0,0 +1,51 @@
package com.ruoyi.iot.task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@ComponentScan("com.ruoyi.iot")
//注解开启异步任务支持
@EnableAsync
public class TaskConfigurer implements AsyncConfigurer {
private static final Logger logger = LoggerFactory.getLogger(TaskConfigurer.class);
@Autowired
private TaskExecutorConfig config;
@Bean
TaskExecutorConfig config(){
return new TaskExecutorConfig();
}
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(config.getCoreSize());
taskExecutor.setMaxPoolSize(config.getMaxSize());
taskExecutor.setQueueCapacity(config.getQueueCapacity());
taskExecutor.setThreadNamePrefix("iot-mqtt-");
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
taskExecutor.initialize(); //数据初始化
logger.debug("=====>>>>taskExecutor init");
return taskExecutor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return AsyncConfigurer.super.getAsyncUncaughtExceptionHandler();
}
}

View File

@@ -0,0 +1,41 @@
package com.ruoyi.iot.task;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Component
@ConfigurationProperties(prefix = "spring.task.execution.pool")
public class TaskExecutorConfig {
private int coreSize;
private int maxSize;
private int queueCapacity;
public int getCoreSize() {
return coreSize;
}
public void setCoreSize(int coreSize) {
this.coreSize = coreSize;
}
public int getMaxSize() {
return maxSize;
}
public void setMaxSize(int maxSize) {
this.maxSize = maxSize;
}
public int getQueueCapacity() {
return queueCapacity;
}
public void setQueueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity;
}
}