开源版新增规则脚本

This commit is contained in:
kerwincui
2024-04-23 16:34:20 +08:00
parent de7e2529a3
commit 054e414b48
50 changed files with 7233 additions and 13 deletions

View File

@@ -5,7 +5,10 @@ import com.fastbee.common.core.mq.DeviceReportBo;
import com.fastbee.common.core.redis.RedisCache;
import com.fastbee.common.enums.ServerType;
import com.fastbee.common.utils.DateUtils;
import com.fastbee.common.utils.StringUtils;
import com.fastbee.common.utils.gateway.mq.TopicsUtils;
import com.fastbee.iot.ruleEngine.MsgContext;
import com.fastbee.iot.ruleEngine.RuleProcess;
import com.fastbee.mq.redischannel.producer.MessageProducer;
import com.fastbee.mq.service.IDeviceReportMessageService;
import com.fastbee.mqtt.annotation.Process;
@@ -25,6 +28,8 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
/**
* 客户端消息推送处理类
@@ -44,6 +49,9 @@ public class MqttPublish implements MqttHandler {
@Resource
private IDeviceReportMessageService deviceReportMessageService;
@Resource
private RuleProcess ruleProcess;
@Override
public void handler(ChannelHandlerContext ctx, MqttMessage message) {
MqttPublishMessage publishMessage = (MqttPublishMessage) message;
@@ -81,6 +89,7 @@ public class MqttPublish implements MqttHandler {
public void sendToMQ(MqttPublishMessage message) {
/*获取topic*/
String topicName = message.variableHeader().topicName();
byte[] source = ByteBufUtil.getBytes(message.content());
/*只处理上报数据*/
if (!topicName.endsWith(FastBeeConstant.MQTT.UP_TOPIC_SUFFIX)) {
return;
@@ -102,6 +111,13 @@ public class MqttPublish implements MqttHandler {
/*设备上报数据*/
reportBo.setReportType(1);
}
// 规则引擎脚本处理,完成后返回结果
MsgContext context = ruleProcess.processRuleScript(reportBo.getSerialNumber(),1, topicName, new String(source));
if (!Objects.isNull(context) && StringUtils.isNotEmpty(context.getPayload())
&& StringUtils.isNotEmpty(context.getTopic())) {
reportBo.setTopicName(context.getTopic());
reportBo.setData(context.getPayload().getBytes(StandardCharsets.UTF_8));
}
if (topicName.contains("property")) {
deviceReportMessageService.parseReportMsg(reportBo);
}

View File

@@ -24,6 +24,8 @@ import com.fastbee.iot.domain.FunctionLog;
import com.fastbee.iot.domain.Product;
import com.fastbee.iot.model.NtpModel;
import com.fastbee.iot.model.ThingsModels.PropertyDto;
import com.fastbee.iot.ruleEngine.MsgContext;
import com.fastbee.iot.ruleEngine.RuleProcess;
import com.fastbee.iot.service.IDeviceService;
import com.fastbee.iot.service.IProductService;
import com.fastbee.iot.service.IThingsModelService;
@@ -37,7 +39,6 @@ import com.fastbee.mq.service.IMqttMessagePublish;
import com.fastbee.mqtt.manager.MqttRemoteManager;
import com.fastbee.mqtt.model.PushMessageBo;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.protocols.IProtocol;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@@ -76,6 +77,9 @@ public class MqttMessagePublishImpl implements IMqttMessagePublish {
private JsonProtocolService jsonProtocolService;
private SnowflakeIdWorker snowflakeIdWorker = new SnowflakeIdWorker(3);
@Resource
private RuleProcess ruleProcess;
@Override
public InstructionsMessage buildMessage(DeviceDownMessage downMessage, TopicType type) {
@@ -145,19 +149,19 @@ public class MqttMessagePublishImpl implements IMqttMessagePublish {
}
/* 下发服务数据存储对象*/
FunctionLog log = new FunctionLog();
log.setCreateTime(DateUtils.getNowDate());
log.setFunValue(bo.getValue().get(bo.getIdentifier()).toString());
log.setMessageId(bo.getMessageId());
log.setSerialNumber(bo.getSerialNumber());
log.setIdentify(bo.getIdentifier());
log.setShowValue(bo.getShowValue());
log.setFunType(1);
log.setModelName(bo.getModelName());
FunctionLog funcLog = new FunctionLog();
funcLog.setCreateTime(DateUtils.getNowDate());
funcLog.setFunValue(bo.getValue().get(bo.getIdentifier()).toString());
funcLog.setMessageId(bo.getMessageId());
funcLog.setSerialNumber(bo.getSerialNumber());
funcLog.setIdentify(bo.getIdentifier());
funcLog.setShowValue(bo.getShowValue());
funcLog.setFunType(1);
funcLog.setModelName(bo.getModelName());
//兼容子设备
if (null != bo.getSlaveId()) {
PropertyDto thingModels = thingsModelService.getSingleThingModels(bo.getProductId(), bo.getIdentifier() + "#" + bo.getSlaveId());
log.setSerialNumber(bo.getSerialNumber() + "_" + bo.getSlaveId());
funcLog.setSerialNumber(bo.getSerialNumber() + "_" + bo.getSlaveId());
bo.setCode(ModbusCode.Write06);
if (!Objects.isNull(thingModels.getCode())){
bo.setCode(ModbusCode.getInstance(Integer.parseInt(thingModels.getCode())));
@@ -182,8 +186,17 @@ public class MqttMessagePublishImpl implements IMqttMessagePublish {
case MQTT:
//组建下发服务指令
InstructionsMessage instruction = buildMessage(downMessage, TopicType.FUNCTION_GET);
mqttClient.publish(instruction.getTopicName(), instruction.getMessage(), log);
MqttMessagePublishImpl.log.debug("=>服务下发,topic=[{}],指令=[{}]", instruction.getTopicName(),new String(instruction.getMessage()));
// 规则引擎脚本处理,完成后返回结果
MsgContext context = ruleProcess.processRuleScript(bo.getSerialNumber(), 2,instruction.getTopicName(),new String(instruction.getMessage()));
if (!Objects.isNull(context) && StringUtils.isNotEmpty(context.getPayload())
&& StringUtils.isNotEmpty(context.getTopic())) {
instruction.setTopicName(context.getTopic());
instruction.setMessage(context.getPayload().getBytes());
}
mqttClient.publish(instruction.getTopicName(), instruction.getMessage(), funcLog);
log.debug("=>服务下发,topic=[{}],指令=[{}]", instruction.getTopicName(),new String(instruction.getMessage()));
break;
}