fix(规则脚本增加日志): 规则脚本增加日志

This commit is contained in:
zhuangpeng.li
2025-04-15 17:22:45 +08:00
parent 8939c78e37
commit 5fc141719a
17 changed files with 395 additions and 53 deletions

View File

@@ -5,9 +5,9 @@ import com.fastbee.common.enums.ServerType;
import com.fastbee.common.utils.DateUtils;
import com.fastbee.common.utils.StringUtils;
import com.fastbee.common.utils.gateway.mq.TopicsUtils;
import com.fastbee.iot.ruleEngine.MsgContext;
import com.fastbee.iot.ruleEngine.RuleProcess;
import com.fastbee.mqttclient.IEmqxMessageProducer;
import com.fastbee.ruleEngine.context.MsgContext;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttMessage;

View File

@@ -1,12 +1,10 @@
package com.fastbee.data.controller;
import com.fastbee.common.annotation.Anonymous;
import com.fastbee.common.annotation.Log;
import com.fastbee.common.core.controller.BaseController;
import com.fastbee.common.core.domain.AjaxResult;
import com.fastbee.common.core.page.TableDataInfo;
import com.fastbee.common.enums.BusinessType;
import com.fastbee.common.utils.gateway.mq.TopicsUtils;
import com.fastbee.common.utils.poi.ExcelUtil;
import com.fastbee.iot.domain.Device;
import com.fastbee.iot.model.DeviceRelateUserInput;

View File

@@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.fastbee</groupId>
<artifactId>fastbee-plugs</artifactId>
<version>3.8.5</version>
</parent>
<artifactId>fastbee-ruleEngine</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.yomahub</groupId>
<artifactId>liteflow-core</artifactId>
<version>${liteflow.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
<version>${commons.text.version}</version>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,20 @@
package com.fastbee.ruleEngine.config;
import com.yomahub.liteflow.thread.ExecutorBuilder;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import java.util.concurrent.*;
public class MainExecutorBuilder implements ExecutorBuilder {
private ThreadFactory springThreadFactory = new CustomizableThreadFactory("liteflow-main-");
@Override
public ExecutorService buildExecutor() {
return new ThreadPoolExecutor(
10,
30,
5,
TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(1000),
springThreadFactory);
}
}

View File

@@ -0,0 +1,20 @@
package com.fastbee.ruleEngine.config;
import com.yomahub.liteflow.thread.ExecutorBuilder;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import java.util.concurrent.*;
public class WhenExecutorBuilder implements ExecutorBuilder {
private ThreadFactory springThreadFactory = new CustomizableThreadFactory("liteflow-when-");
@Override
public ExecutorService buildExecutor() {
return new ThreadPoolExecutor(
10,
30,
5,
TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(1000),
springThreadFactory);
}
}

View File

@@ -0,0 +1,108 @@
package com.fastbee.ruleEngine.context;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.exception.NullParamException;
import com.yomahub.liteflow.log.LFLoggerManager;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.text.StringSubstitutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Slf4j
public class MsgContext {
private static Logger logger = LoggerFactory.getLogger("script");
/**
* 消息主题
*/
private String topic;
/**
* 消息内容
*/
private String payload;
/**
* 设备编号
*/
private String serialNumber;
/**
* 产品id
*/
private Long productId;
/**
* 协议编码
*/
private String protocolCode;
private ConcurrentHashMap<String, Object> dataMap = new ConcurrentHashMap<>();
public void printlog(String var1, Object... var2) {
String requestId = LFLoggerManager.getRequestId();
logger.info(StrUtil.isBlank(requestId) ? "" : StrUtil.format("[{}]:", new Object[]{LFLoggerManager.getRequestId()}) + var1, var2);
}
private <T> void putDataMap(String key, T t) {
if (this.dataMap == null) {
this.dataMap = new ConcurrentHashMap<>();
}
if (ObjectUtil.isNull(t)) {
throw new NullParamException("data can't accept null param");
} else {
this.dataMap.put(key, t);
}
}
public boolean hasData(String key) {
if (this.dataMap == null) {
return false;
}
return this.dataMap.containsKey(key);
}
public <T> T getData(String key) {
if (this.dataMap == null) {
return null;
}
return (T) this.dataMap.get(key);
}
public <T> void setData(String key, T t) {
this.putDataMap(key, t);
}
//"{ topic:${topic}, payload:${payload} }";
// 自定义占位符可在脚本中使用msgContext.setData("test":1);
// 然后通过${test}调用可在输出侧http body和 sql语句中传入该值
public String placeholders(String str) {
if (getData("topic") == null) {
setData("topic", getTopic());
}
if (getData("payload") == null) {
setData("payload", getPayload());
}
if (getData("serialNumber") == null) {
setData("serialNumber", getSerialNumber());
}
if (getData("productId") == null) {
setData("productId", getProductId());
}
if (getData("protocolCode") == null) {
setData("protocolCode", getProtocolCode());
}
StringSubstitutor substitutor = new StringSubstitutor(this.dataMap);
return substitutor.replace(str);
}
}

View File

@@ -0,0 +1,126 @@
package com.fastbee.ruleEngine.core;
import com.fastbee.ruleEngine.context.MsgContext;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.flow.LiteflowResponse;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@Component
@Slf4j
public class FlowLogExecutor {
private static final Logger script_logger = LoggerFactory.getLogger("script");
private static final Logger scene_logger = LoggerFactory.getLogger("scene");
@Resource
private FlowExecutor flowExecutor;
public LiteflowResponse execute2Resp(String chainId, Object param, Object... contextBeanArray) {
printContextBean(contextBeanArray);
LiteflowResponse response = flowExecutor.execute2Resp(chainId, param, contextBeanArray);
printResponse(response);
return response;
}
public LiteflowResponse execute2RespWithRid(String chainId, Object param, String requestId, Object... contextBeanArray) {
printContextBeanWithRid(requestId, contextBeanArray);
LiteflowResponse response = flowExecutor.execute2RespWithRid(chainId, param, requestId, contextBeanArray);
printResponseWithRid(requestId, response);
return response;
}
public LiteflowResponse execute2Future(String chainId, Object param, Object... contextBeanArray) throws ExecutionException, InterruptedException {
printContextBean(contextBeanArray);
Future<LiteflowResponse> future = flowExecutor.execute2Future(chainId, param, contextBeanArray);
LiteflowResponse response = future.get();
printResponse(response);
return response;
}
public LiteflowResponse execute2FutureWithRid(String chainId, Object param, String requestId, Object... contextBeanArray) throws ExecutionException, InterruptedException {
printContextBeanWithRid(requestId, contextBeanArray);
Future<LiteflowResponse> future = flowExecutor.execute2FutureWithRid(chainId, param, requestId, contextBeanArray);
LiteflowResponse response = future.get();
printResponseWithRid(requestId, response);
return response;
}
public void printContextBean(Object... contextBeanArray) {
log.info("=====+>规则引擎执行前,上下文变量值");
for (Object contextBean : contextBeanArray) {
log.info("上下文数值:{}", contextBean);
}
log.info("=====+>规则引擎正在执行......");
}
public void printContextBeanWithRid(String requestId, Object... contextBeanArray) {
Logger ruleLogger;
String[] parts = requestId.split("/");
if (parts.length >= 2) {
if (Objects.equals(parts[0], "script")) {
ruleLogger = script_logger;
} else if (Objects.equals(parts[0], "scene")) {
ruleLogger = scene_logger;
} else {
ruleLogger = log;
}
ruleLogger.info("[{}]=====+>规则引擎执行前,上下文变量值", requestId);
for (Object contextBean : contextBeanArray) {
ruleLogger.info("[{}]=====+>上下文数值:{}", requestId, contextBean);
}
ruleLogger.info("[{}]=====+>规则引擎正在执行......", requestId);
}
}
public void printResponse(LiteflowResponse response) {
if (!response.isSuccess()) {
Exception e = response.getCause();
log.error("=====+>报错信息:{}", e.getMessage(), e);
} else {
//步骤详情
// Map<String, List<CmpStep>> stepMap = response.getExecuteSteps();
// stepMap.forEach((k, v) -> {
// v.forEach((step) -> {
// log.info("步骤:{}({}),执行时间:{}", step.getNodeId(), step.getNodeName(), step.getTimeSpent());
// });
// });
//每各步骤执行时间
String stepStr = response.getExecuteStepStrWithTime();
log.info("=====+>步骤:{}", stepStr);
}
}
public void printResponseWithRid(String requestId, LiteflowResponse response) {
Logger ruleLogger;
String[] parts = requestId.split("/");
if (parts.length >= 2) {
if (Objects.equals(parts[0], "script")) {
ruleLogger = script_logger;
MsgContext msgContext = response.getContextBean(MsgContext.class);
if (msgContext != null) {
ruleLogger.info("[{}]=====+>执行后msgContext{}", requestId, msgContext);
}
} else if (Objects.equals(parts[0], "scene")) {
ruleLogger = scene_logger;
} else {
ruleLogger = log;
}
if (!response.isSuccess()) {
Exception e = response.getCause();
ruleLogger.error("[{}]=====+>报错信息:{}", requestId, e.toString());
} else {
//每各步骤执行时间
String stepStr = response.getExecuteStepStrWithTime();
ruleLogger.info("[{}]=====+>步骤:{}", requestId, stepStr);
ruleLogger.info("[{}]=====+>执行完成!", requestId);
}
}
}
}

View File

@@ -0,0 +1,29 @@
package com.fastbee.ruleEngine.core;
import cn.hutool.core.lang.Snowflake;
import cn.hutool.core.util.IdUtil;
import com.fastbee.ruleEngine.util.Constant;
public class RequestIdBuilder {
public static String buildSnowflakeRequestId() {
Snowflake snowflake = IdUtil.createSnowflake(1, 1);
return Constant.REQUEST_ID_SNOWFLAKE_PREFIX + snowflake.nextId();
}
public static String buildDeviceRequestId(String serialNumber) {
return Constant.REQUEST_ID_PREFIX + serialNumber;
}
public static String buildSceneRequestId(String sceneId) {
return Constant.REQUEST_ID_PREFIX + sceneId;
}
public static String buildProductRequestId(Long product, String serialNumber) {
return Constant.REQUEST_ID_PREFIX + product + Constant.REQUEST_ID_SPLIT + serialNumber;
}
public static String buildALLRequestId(String serialNumber, String product, String sceneId) {
return Constant.REQUEST_ID_PREFIX + product + Constant.REQUEST_ID_SPLIT + serialNumber + Constant.REQUEST_ID_SPLIT + sceneId;
}
}

View File

@@ -0,0 +1,11 @@
package com.fastbee.ruleEngine.util;
public class Constant {
public static final String REQUEST_ID_SNOWFLAKE_PREFIX = "S";
public static final String REQUEST_ID_PREFIX = "R";
public static final String REQUEST_ID_SPLIT = "_";
// D=数据流A=执行动作T=触发器
public static final String SCRIPT_DATA_PREFIX = "D";
public static final String SCRIPT_ACTION_PREFIX = "A";
public static final String SCRIPT_T_PREFIX = "T";
}

View File

@@ -17,6 +17,7 @@
<module>fastbee-generator</module>
<module>fastbee-http</module>
<module>fastbee-mqtt-client</module>
<module>fastbee-ruleEngine</module>
</modules>
<dependencies>

View File

@@ -7,7 +7,6 @@ import com.fastbee.common.enums.ServerType;
import com.fastbee.common.utils.DateUtils;
import com.fastbee.common.utils.StringUtils;
import com.fastbee.common.utils.gateway.mq.TopicsUtils;
import com.fastbee.iot.ruleEngine.MsgContext;
import com.fastbee.iot.ruleEngine.RuleProcess;
import com.fastbee.mq.redischannel.producer.MessageProducer;
import com.fastbee.mq.service.IDeviceReportMessageService;
@@ -20,6 +19,7 @@ import com.fastbee.mqtt.model.ClientMessage;
import com.fastbee.mqtt.service.IMessageStore;
import com.fastbee.base.session.Session;
import com.fastbee.base.util.AttributeUtils;
import com.fastbee.ruleEngine.context.MsgContext;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.*;

View File

@@ -28,7 +28,6 @@ import com.fastbee.iot.domain.FunctionLog;
import com.fastbee.iot.domain.Product;
import com.fastbee.iot.model.NtpModel;
import com.fastbee.iot.model.ThingsModels.PropertyDto;
import com.fastbee.iot.ruleEngine.MsgContext;
import com.fastbee.iot.ruleEngine.RuleProcess;
import com.fastbee.iot.service.IDeviceService;
import com.fastbee.iot.service.IFunctionLogService;
@@ -42,6 +41,7 @@ import com.fastbee.mq.service.IMqttMessagePublish;
import com.fastbee.mqtt.manager.MqttRemoteManager;
import com.fastbee.mqtt.model.PushMessageBo;
import com.fastbee.mqttclient.PubMqttClient;
import com.fastbee.ruleEngine.context.MsgContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

View File

@@ -120,7 +120,10 @@
<artifactId>liteflow-script-groovy</artifactId>
<version>${liteflow.version}</version>
</dependency>
<dependency>
<groupId>com.fastbee</groupId>
<artifactId>fastbee-ruleEngine</artifactId>
</dependency>
</dependencies>

View File

@@ -1,36 +0,0 @@
package com.fastbee.iot.ruleEngine;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MsgContext {
private static Logger logger = LoggerFactory.getLogger("script");
/** 消息主题 */
private String topic;
/** 消息内容 */
private String payload;
/**
* 设备编号
*/
private String serialNumber;
/**
* 产品id
*/
private Long productId;
/**
* 协议编码
*/
private String protocolCode;
}

View File

@@ -7,8 +7,9 @@ import com.fastbee.iot.model.ProductCode;
import com.fastbee.iot.model.ScriptCondition;
import com.fastbee.iot.service.IProductService;
import com.fastbee.iot.service.IScriptService;
import com.fastbee.ruleEngine.context.MsgContext;
import com.fastbee.ruleEngine.core.FlowLogExecutor;
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.flow.LiteflowResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -27,7 +28,7 @@ import java.util.Objects;
public class RuleProcess {
@Resource
private FlowExecutor flowExecutor;
private FlowLogExecutor flowLogExecutor;
@Resource
private IScriptService scriptService;
@Resource
@@ -55,20 +56,29 @@ public class RuleProcess {
scriptCondition.setScriptEvent(event); // 事件 1=设备上报 2=平台下发 3=设备上线 4=设备下线
scriptCondition.setScriptPurpose(1); // 脚本用途:数据流=1
String[] scriptIds = scriptService.selectRuleScriptIdArray(scriptCondition);
MsgContext context = new MsgContext(topic, payload, serialNumber, productCode.getProductId(), productCode.getProtocolCode());
MsgContext context = MsgContext.builder()
.serialNumber(serialNumber)
.productId(productCode.getProductId())
.protocolCode(productCode.getProtocolCode())
.payload(payload)
.topic(topic)
.build();
//如果查询不到脚本,则认为是不用处理
if (Objects.isNull(scriptIds) || scriptIds.length == 0) {
return new MsgContext();
}
// 动态构造Chain和EL表达式
String el = String.join(",", scriptIds); // THENa,b,c,d
LiteFlowChainELBuilder.createChain().setChainName("dataChain").setEL("THEN(" + el + ")").build();
for (String script : scriptIds) {
String eChainName = "dataChain_" + script;
String requestId = "script/" + script;
String el = "THEN(" + script + ")";
LiteFlowChainELBuilder.createChain().setChainName(eChainName).setEL(el).build();
// 执行规则脚本
LiteflowResponse response = flowExecutor.execute2Resp("dataChain", null, context);
LiteflowResponse response = flowLogExecutor.execute2RespWithRid(eChainName, null, requestId, context);
if (!response.isSuccess()) {
log.error("规则脚本执行发生错误:" + response.getMessage());
}
}
return context;
}

View File

@@ -30,7 +30,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

View File

@@ -46,6 +46,7 @@
<lock4j.version>2.2.3</lock4j.version>
<easyexcel.version>3.3.1</easyexcel.version>
<liteflow.version>2.12.2</liteflow.version>
<commons.text.version>1.10.0</commons.text.version>
</properties>
<!-- 依赖声明 -->
@@ -343,6 +344,12 @@
<version>${fastbee.version}</version>
</dependency>
<dependency>
<groupId>com.fastbee</groupId>
<artifactId>fastbee-ruleEngine</artifactId>
<version>${fastbee.version}</version>
</dependency>
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct</artifactId> <!-- use mapstruct-jdk8 for Java 8 or higher -->