Merge branch 'master' into Bulbasaur

# Conflicts:
#	consumer/src/main/java/cn/lili/timetask/handler/impl/promotion/PromotionEverydayExecute.java
#	consumer/src/main/java/cn/lili/trigger/executor/PromotionTimeTriggerExecutor.java
#	framework/src/main/java/cn/lili/common/trigger/delay/AbstractDelayQueueMachineFactory.java
#	framework/src/main/java/cn/lili/modules/order/order/serviceimpl/OrderServiceImpl.java
#	framework/src/main/java/cn/lili/modules/promotion/serviceimpl/CouponServiceImpl.java
This commit is contained in:
Chopper
2021-06-15 15:50:54 +08:00
32 changed files with 516 additions and 413 deletions

View File

@@ -0,0 +1,94 @@
package cn.lili.trigger;
import cn.hutool.json.JSONUtil;
import cn.lili.common.cache.Cache;
import cn.lili.common.utils.ThreadPoolUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.DefaultTypedTuple;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* 延时队列工厂
*
* @author paulG
* @since 2020/11/7
**/
@Slf4j
public abstract class AbstractDelayQueueListen {
@Autowired
private Cache cache;
/**
* 延时队列机器开始运作
*/
private void startDelayQueueMachine() {
log.info("延时队列机器{}开始运作", setDelayQueueName());
// 监听redis队列
while (true) {
try {
// 获取当前时间的时间戳
long now = System.currentTimeMillis() / 1000;
// 获取当前时间前需要执行的任务列表
Set<DefaultTypedTuple> tuples = cache.zRangeByScore(setDelayQueueName(), 0, now);
// 如果任务不为空
if (!CollectionUtils.isEmpty(tuples)) {
log.info("执行任务:{}", JSONUtil.toJsonStr(tuples));
for (DefaultTypedTuple tuple : tuples) {
String jobId = (String) tuple.getValue();
// 移除缓存,如果移除成功则表示当前线程处理了延时任务,则执行延时任务
Long num = cache.zRemove(setDelayQueueName(), jobId);
// 如果移除成功, 则执行
if (num > 0) {
ThreadPoolUtil.execute(() -> invoke(jobId));
}
}
}
} catch (Exception e) {
log.error("处理延时任务发生异常,异常原因为{}", e.getMessage(), e);
} finally {
// 间隔一秒钟搞一次
try {
TimeUnit.SECONDS.sleep(5L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 最终执行的任务方法
*
* @param jobId 任务id
*/
public abstract void invoke(String jobId);
/**
* 要实现延时队列的名字
*/
public abstract String setDelayQueueName();
/**
* 监听队列
*/
@PostConstruct
public void init() {
new Thread(this::startDelayQueueMachine).start();
}
}

View File

@@ -2,9 +2,8 @@ package cn.lili.trigger;
import cn.hutool.json.JSONUtil;
import cn.lili.common.cache.Cache;
import cn.lili.common.trigger.interfaces.TimeTriggerExecutor;
import cn.lili.common.trigger.model.TimeTriggerMsg;
import cn.lili.common.trigger.util.TimeTriggerUtil;
import cn.lili.common.trigger.util.DelayQueueTools;
import cn.lili.common.utils.SpringContextUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
@@ -28,7 +27,7 @@ public class TimeTriggerConsumer implements RocketMQListener<TimeTriggerMsg> {
@Override
public void onMessage(TimeTriggerMsg timeTriggerMsg) {
try {
String key = TimeTriggerUtil.generateKey(timeTriggerMsg.getTriggerExecutor(), timeTriggerMsg.getTriggerTime(), timeTriggerMsg.getUniqueKey());
String key = DelayQueueTools.generateKey(timeTriggerMsg.getTriggerExecutor(), timeTriggerMsg.getTriggerTime(), timeTriggerMsg.getUniqueKey());
if (cache.get(key) == null) {
log.info("执行器执行被取消:{} | 任务标识:{}", timeTriggerMsg.getTriggerExecutor(), timeTriggerMsg.getUniqueKey());

View File

@@ -0,0 +1,18 @@
package cn.lili.trigger;
/**
* 延时任务执行器接口
*
* @author Chopper
*/
public interface TimeTriggerExecutor {
/**
* 执行任务
*
* @param object 任务参数
*/
void execute(Object object);
}

View File

@@ -1,13 +1,12 @@
package cn.lili.trigger.executor;
import cn.hutool.json.JSONUtil;
import cn.lili.common.trigger.interfaces.TimeTrigger;
import cn.lili.trigger.TimeTriggerExecutor;
import cn.lili.common.trigger.message.PintuanOrderMessage;
import cn.lili.common.trigger.message.PromotionMessage;
import cn.lili.common.trigger.interfaces.TimeTrigger;
import cn.lili.common.trigger.interfaces.TimeTriggerExecutor;
import cn.lili.common.trigger.model.TimeExecuteConstant;
import cn.lili.common.trigger.model.TimeTriggerMsg;
import cn.lili.common.utils.DateUtil;
import cn.lili.config.rocketmq.RocketmqCustomProperties;
import cn.lili.modules.order.order.service.OrderService;
import cn.lili.modules.promotion.entity.enums.PromotionStatusEnum;
@@ -48,13 +47,21 @@ public class PromotionTimeTriggerExecutor implements TimeTriggerExecutor {
log.info("促销活动信息消费:{}", promotionMessage);
// 如果为促销活动开始,则需要发布促销活动结束的定时任务
if (PromotionStatusEnum.START.name().equals(promotionMessage.getPromotionStatus())) {
//设置活动关闭时间
setCloseTime(promotionMessage);
}
//更新促销活动状态
if (!promotionService.updatePromotionStatus(promotionMessage)) {
log.error("开始促销活动失败: {}", promotionMessage);
return;
if (!promotionService.updatePromotionStatus(promotionMessage)) {
log.error("开始促销活动失败: {}", promotionMessage);
return;
}
// 促销活动开始后,设置促销活动结束的定时任务
promotionMessage.setPromotionStatus(PromotionStatusEnum.END.name());
String uniqueKey = "{TIME_TRIGGER_" + promotionMessage.getPromotionType() + "}_" + promotionMessage.getPromotionId();
// 结束时间(延时一分钟)
long closeTime = promotionMessage.getEndTime().getTime() + 60000;
TimeTriggerMsg timeTriggerMsg = new TimeTriggerMsg(TimeExecuteConstant.PROMOTION_EXECUTOR, closeTime, promotionMessage, uniqueKey, rocketmqCustomProperties.getPromotionTopic());
//添加延时任务
timeTrigger.addDelay(timeTriggerMsg);
} else {
//不是开始,则修改活动状态
promotionService.updatePromotionStatus(promotionMessage);
}
return;
}
@@ -67,22 +74,5 @@ public class PromotionTimeTriggerExecutor implements TimeTriggerExecutor {
}
}
/**
* 设置促销活动结束时间
*
* @param promotionMessage 信息队列传输促销信息实体
*/
private void setCloseTime(PromotionMessage promotionMessage) {
//如果设置了活动结束时间则创建促销结束延时任务
if(promotionMessage.getEndTime()!=null){
// 促销活动开始后,设置促销活动结束的定时任务
promotionMessage.setPromotionStatus(PromotionStatusEnum.END.name());
String uniqueKey = "{TIME_TRIGGER_" + promotionMessage.getPromotionType() + "}_" + promotionMessage.getPromotionId();
// 结束时间(延时一分钟)
long closeTime = promotionMessage.getEndTime().getTime() + 60000;
TimeTriggerMsg timeTriggerMsg = new TimeTriggerMsg(TimeExecuteConstant.PROMOTION_EXECUTOR, closeTime, promotionMessage, uniqueKey, rocketmqCustomProperties.getPromotionTopic());
timeTrigger.addDelay(timeTriggerMsg, DateUtil.getDelayTime(promotionMessage.getEndTime().getTime()));
}
}
}

View File

@@ -0,0 +1,34 @@
package cn.lili.trigger.listen;
import cn.hutool.json.JSONUtil;
import cn.lili.common.trigger.enums.DelayQueueEnums;
import cn.lili.common.trigger.interfaces.TimeTrigger;
import cn.lili.common.trigger.model.TimeTriggerMsg;
import cn.lili.trigger.AbstractDelayQueueListen;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* PromotionTimeTriggerListen
*
* @author Chopper
* @version v1.0
* 2021-06-11 10:47
*/
@Component
public class PromotionDelayQueueListen extends AbstractDelayQueueListen {
@Autowired
private TimeTrigger timeTrigger;
@Override
public void invoke(String jobId) {
timeTrigger.execute(JSONUtil.toBean(jobId, TimeTriggerMsg.class));
}
@Override
public String setDelayQueueName() {
return DelayQueueEnums.PROMOTION_QUEUE.name();
}
}