枚举
package org.jeecg.modules.redisson.delay;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import java.util.concurrent.TimeUnit;
/**
* 延迟队列业务枚举
*/
@Getter
@NoArgsConstructor
@AllArgsConstructor
public enum RedisDelayQueueEnum {
/**
* 流程执行记录
*/
PROCESS_EXECUTION_RECORD("PROCESS_EXECUTION_RECORD", "流程执行记录", 60, TimeUnit.SECONDS, "processExecutionRecordDelayConsumer"),
/**
* 延迟队列 Redis Key
*/
private String code;
private String name;
private long delay;
private TimeUnit timeUnit;
/**
* 延迟队列具体业务实现的 Bean
* 可通过 Spring 的上下文获取
*/
private String beanName;
}
工具类
package org.jeecg.modules.redisson.delay;
import cn.hutool.core.util.ReflectUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class RedisDelayQueueUtil {
@Autowired
private RedissonClient redissonClient;
/**
* 添加延迟队列
*
* @param value 队列值
* @param redisDelayQueueEnum 队列枚举
* @param <T>
*/
public <T> void addDelayQueue(T value, RedisDelayQueueEnum redisDelayQueueEnum) {
try {
ReflectUtil.setFieldValue(value, "createTime", new Date());
RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(redisDelayQueueEnum.getCode());
RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
delayedQueue.offer(value, redisDelayQueueEnum.getDelay(), redisDelayQueueEnum.getTimeUnit());
log.info("1 (添加延时队列成功) 队列键:{},队列值:{},延迟时间:{}", redisDelayQueueEnum.getCode(), value, redisDelayQueueEnum.getTimeUnit().toSeconds(redisDelayQueueEnum.getDelay()) + "秒");
} catch (Exception e) {
log.error("(添加延时队列失败) {" + e.getMessage() + "}", e);
throw new RuntimeException("(添加延时队列失败)");
}
}
/**
* 获取延迟队列
*
* @param queueCode
* @param <T>
* @return
* @throws InterruptedException
*/
public <T> T getDelayQueue(String queueCode) throws InterruptedException {
RBlockingDeque<Map> blockingDeque = redissonClient.getBlockingDeque(queueCode);
T value = (T) blockingDeque.take();
return value;
}
/**
* 项目重启后处理
*/
public void reDealMessage(String queueCode) {
RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode);
RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
delayedQueue.offer(null, 0, TimeUnit.SECONDS);
log.info("Consumer startup completed");
}
}
队列初始化启动
package org.jeecg.modules.redisson.delay;
import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.util.SpringContextUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 延迟队列初始化启动
**/
@Slf4j
@Component
public class RedisDelayQueueRunner implements CommandLineRunner {
@Autowired
private RedisDelayQueueUtil redisDelayQueueUtil;
@Autowired
private ExecutorService asyncRedissonDelayQueue;
private final AtomicBoolean stopFlag = new AtomicBoolean(false);
private final List<Thread> threads = new ArrayList<>();
/**
* 项目启动后执行
*
* @param args incoming main method arguments
*/
@Override
public void run(String... args) {
// 启动延迟队列接收器
dealDelayMessage();
// 为每一个code发送一条空消息,防止出现重启后阻塞无法消费
reDealMessage();
}
private void reDealMessage() {
for (RedisDelayQueueEnum queueEnum : RedisDelayQueueEnum.values()) {
log.info("启动重启后处理队列线程" + queueEnum.getCode());
// 重启后,未处理的消息重新处理
redisDelayQueueUtil.reDealMessage(queueEnum.getCode());
}
}
@PreDestroy
public void destroy() {
// 设置标志位为 true
stopFlag.set(true);
ThreadUtil.sleep(5, TimeUnit.SECONDS);
// 5秒后,强制关闭关闭队列处理线程
for (Thread thread : threads) {
thread.interrupt();
}
// 关闭线程池
asyncRedissonDelayQueue.shutdown();
log.warn("延迟队列处理线程已全部关闭");
}
private void dealDelayMessage() {
// 使用枚举类来遍历所有队列类型
for (RedisDelayQueueEnum queueEnum : RedisDelayQueueEnum.values()) {
Thread thread = new Thread(() -> {
log.info("启动监听队列线程" + queueEnum.getCode());
int count = 0;
while (!stopFlag.get()) {
try {
Object value = redisDelayQueueUtil.getDelayQueue(queueEnum.getCode());
// 如果为空则跳过执行
if (value == null) {
continue;
}
// 使用Spring容器获取对应队列处理器
RedisDelayQueueHandle redisDelayQueueHandle = SpringContextUtils.getBeanInstanceByBeanId(queueEnum.getBeanName());
asyncRedissonDelayQueue.execute(() -> redisDelayQueueHandle.execute(value));
count++;
if (count >= 60) {
ThreadUtil.sleep(500);
count = 0;
}
} catch (Exception e) {
log.error("延迟队列队列线程错误,", e);
ThreadUtil.sleep(10000, TimeUnit.MICROSECONDS);
}
}
});
threads.add(thread);
thread.start();
}
// 使用info级别输出启动成功信息
log.info("(Redis延迟队列启动成功)");
}
}
创建线程池
package org.jeecg.config.pool;
import cn.hutool.core.thread.ExecutorBuilder;
import cn.hutool.core.thread.ThreadUtil;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ExecutorService;
@Configuration
public class myThreadPollConfiguration {
/**
* asyncRedisson
*/
@Bean(name = "asyncRedissonDelayQueue")
public ExecutorService getAsyncRedissonDelayQueue() {
return ExecutorBuilder.create()
.setCorePoolSize(4)
.setMaxPoolSize(20)
.setThreadFactory(ThreadUtil.createThreadFactory("asyncRedissonDelayQueue-thread-pool"))
.build();
}
}
创建订阅Handler
public interface RedisDelayQueueHandle<T> {
void execute(T t);
}
实现订阅Handler
package org.jeecg.modules.extbpm.listener.global;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.system.api.ISysIpdAPI;
import org.jeecg.common.util.SwishDbUtil;
import org.jeecg.modules.extbpm.listener.entity.ExecutionTmp;
import org.jeecg.modules.extbpm.process.service.IExtActProcessExecutionDetailService;
import org.jeecg.modules.redisson.delay.RedisDelayQueueHandle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ProcessExecutionRecordDelayConsumer implements RedisDelayQueueHandle<ExecutionTmp> {
@Autowired
private IExtActProcessExecutionDetailService executionDetailService;
@Autowired
private ISysIpdAPI sysIpdAPI;
/**
* TODO 计算执行时间,设置线程池核心大小,并限速
*
* @param executionTmp
*/
@Override
public void execute(ExecutionTmp executionTmp) {
log.info("流程执行记录更新|ProcessExecutionRecordService execute,{}", executionTmp.getActivityInstanceId());
SwishDbUtil.swishDbByTenantId(executionTmp.getTenant(), "流程执行记录异步更新");
// 相同字符串,字符串常量池中只有一个
synchronized (executionTmp.getActivityInstanceId().intern()) {
// 执行记录
executionDetailService.updateRecord(executionTmp);
// 钉钉通知
sysIpdAPI.executeDingPlan((JSONObject) JSON.toJSON(executionTmp));
}
}
}
发送者
redisDelayQueueUtil.addDelayQueue(DelayDTO, RedisDelayQueueEnum.PROCESS_EXECUTION_RECORD);