枚举

package org.jeecg.modules.redisson.delay;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import java.util.concurrent.TimeUnit;

/**
 * 延迟队列业务枚举
 */
@Getter
@NoArgsConstructor
@AllArgsConstructor
public enum RedisDelayQueueEnum {

    /**
     * 流程执行记录
     */
    PROCESS_EXECUTION_RECORD("PROCESS_EXECUTION_RECORD", "流程执行记录", 60, TimeUnit.SECONDS, "processExecutionRecordDelayConsumer"),

    /**
     * 延迟队列 Redis Key
     */
    private String code;

    private String name;

    private long delay;

    private TimeUnit timeUnit;
    
    /**
     * 延迟队列具体业务实现的 Bean
     * 可通过 Spring 的上下文获取
     */
    private String beanName;

}

工具类

package org.jeecg.modules.redisson.delay;

import cn.hutool.core.util.ReflectUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.TimeUnit;

@Slf4j
@Component
public class RedisDelayQueueUtil {

    @Autowired
    private RedissonClient redissonClient;

    /**
     * 添加延迟队列
     *
     * @param value               队列值
     * @param redisDelayQueueEnum 队列枚举
     * @param <T>
     */
    public <T> void addDelayQueue(T value, RedisDelayQueueEnum redisDelayQueueEnum) {
        try {
            ReflectUtil.setFieldValue(value, "createTime", new Date());
            RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(redisDelayQueueEnum.getCode());
            RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
            delayedQueue.offer(value, redisDelayQueueEnum.getDelay(), redisDelayQueueEnum.getTimeUnit());
            log.info("1 (添加延时队列成功) 队列键:{},队列值:{},延迟时间:{}", redisDelayQueueEnum.getCode(), value, redisDelayQueueEnum.getTimeUnit().toSeconds(redisDelayQueueEnum.getDelay()) + "秒");
        } catch (Exception e) {
            log.error("(添加延时队列失败) {" + e.getMessage() + "}", e);
            throw new RuntimeException("(添加延时队列失败)");
        }
    }

    /**
     * 获取延迟队列
     *
     * @param queueCode
     * @param <T>
     * @return
     * @throws InterruptedException
     */
    public <T> T getDelayQueue(String queueCode) throws InterruptedException {
        RBlockingDeque<Map> blockingDeque = redissonClient.getBlockingDeque(queueCode);
        T value = (T) blockingDeque.take();
        return value;
    }

    /**
     * 项目重启后处理
     */
    public void reDealMessage(String queueCode) {
        RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode);
        RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        delayedQueue.offer(null, 0, TimeUnit.SECONDS);
        log.info("Consumer startup completed");
    }
}

队列初始化启动

package org.jeecg.modules.redisson.delay;

import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.util.SpringContextUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 延迟队列初始化启动
 **/
@Slf4j
@Component
public class RedisDelayQueueRunner implements CommandLineRunner {

    @Autowired
    private RedisDelayQueueUtil redisDelayQueueUtil;
    @Autowired
    private ExecutorService asyncRedissonDelayQueue;
    private final AtomicBoolean stopFlag = new AtomicBoolean(false);
    private final List<Thread> threads = new ArrayList<>();

    /**
     * 项目启动后执行
     *
     * @param args incoming main method arguments
     */
    @Override
    public void run(String... args) {
        // 启动延迟队列接收器
        dealDelayMessage();
        // 为每一个code发送一条空消息,防止出现重启后阻塞无法消费
        reDealMessage();

    }

    private void reDealMessage() {
        for (RedisDelayQueueEnum queueEnum : RedisDelayQueueEnum.values()) {
            log.info("启动重启后处理队列线程" + queueEnum.getCode());
            // 重启后,未处理的消息重新处理
            redisDelayQueueUtil.reDealMessage(queueEnum.getCode());
        }

    }

    @PreDestroy
    public void destroy() {
        // 设置标志位为 true
        stopFlag.set(true);
        ThreadUtil.sleep(5, TimeUnit.SECONDS);
        // 5秒后,强制关闭关闭队列处理线程
        for (Thread thread : threads) {
            thread.interrupt();
        }

        // 关闭线程池
        asyncRedissonDelayQueue.shutdown();
        log.warn("延迟队列处理线程已全部关闭");
    }

    private void dealDelayMessage() {
        // 使用枚举类来遍历所有队列类型
        for (RedisDelayQueueEnum queueEnum : RedisDelayQueueEnum.values()) {
            Thread thread = new Thread(() -> {
                log.info("启动监听队列线程" + queueEnum.getCode());
                int count = 0;
                while (!stopFlag.get()) {
                    try {
                        Object value = redisDelayQueueUtil.getDelayQueue(queueEnum.getCode());
                        // 如果为空则跳过执行
                        if (value == null) {
                            continue;
                        }
                        // 使用Spring容器获取对应队列处理器
                        RedisDelayQueueHandle redisDelayQueueHandle = SpringContextUtils.getBeanInstanceByBeanId(queueEnum.getBeanName());
                        asyncRedissonDelayQueue.execute(() -> redisDelayQueueHandle.execute(value));
                        count++;
                        if (count >= 60) {
                            ThreadUtil.sleep(500);
                            count = 0;
                        }
                    } catch (Exception e) {
                        log.error("延迟队列队列线程错误,", e);
                        ThreadUtil.sleep(10000, TimeUnit.MICROSECONDS);
                    }
                }

            });
            threads.add(thread);
            thread.start();
        }
        // 使用info级别输出启动成功信息
        log.info("(Redis延迟队列启动成功)");
    }
}

创建线程池

package org.jeecg.config.pool;

import cn.hutool.core.thread.ExecutorBuilder;
import cn.hutool.core.thread.ThreadUtil;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ExecutorService;

@Configuration
public class myThreadPollConfiguration {
    /**
     * asyncRedisson
     */
    @Bean(name = "asyncRedissonDelayQueue")
    public ExecutorService getAsyncRedissonDelayQueue() {
        return ExecutorBuilder.create()
                .setCorePoolSize(4)
                .setMaxPoolSize(20)
                .setThreadFactory(ThreadUtil.createThreadFactory("asyncRedissonDelayQueue-thread-pool"))
                .build();
    }

}

创建订阅Handler

public interface RedisDelayQueueHandle<T> {

    void execute(T t);
}

实现订阅Handler

package org.jeecg.modules.extbpm.listener.global;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.system.api.ISysIpdAPI;
import org.jeecg.common.util.SwishDbUtil;
import org.jeecg.modules.extbpm.listener.entity.ExecutionTmp;
import org.jeecg.modules.extbpm.process.service.IExtActProcessExecutionDetailService;
import org.jeecg.modules.redisson.delay.RedisDelayQueueHandle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ProcessExecutionRecordDelayConsumer implements RedisDelayQueueHandle<ExecutionTmp> {
    @Autowired
    private IExtActProcessExecutionDetailService executionDetailService;
    @Autowired
    private ISysIpdAPI sysIpdAPI;

    /**
     * TODO 计算执行时间,设置线程池核心大小,并限速
     *
     * @param executionTmp
     */
    @Override
    public void execute(ExecutionTmp executionTmp) {

        log.info("流程执行记录更新|ProcessExecutionRecordService execute,{}", executionTmp.getActivityInstanceId());
        SwishDbUtil.swishDbByTenantId(executionTmp.getTenant(), "流程执行记录异步更新");
        // 相同字符串,字符串常量池中只有一个
        synchronized (executionTmp.getActivityInstanceId().intern()) {
            // 执行记录
            executionDetailService.updateRecord(executionTmp);
            // 钉钉通知
            sysIpdAPI.executeDingPlan((JSONObject) JSON.toJSON(executionTmp));
        }
    }

}

发送者

redisDelayQueueUtil.addDelayQueue(DelayDTO, RedisDelayQueueEnum.PROCESS_EXECUTION_RECORD);