1. 生产需求

  • 用户下订单后,15分钟未支付自动取消;
  • 用户成功下单支付后确认收货, 15天默认好评;

2. 实现思路

  • 利用redis的排序列表,ZSet进行需求实现, 下面是我的流程图和思路导线

3. 思路说明

  • 我们把Zset中的score当成时间戳, 这样我们就可以获得以时间戳排序的任务列表, 这我们通过score区间进行拉取任务,进行消费.

4. 代码封装实现

  • 首先是封装延时队列的工厂(完美契合Spring框架), 如果想要创建自己的特色延时队列则需要继承这个抽象工厂
package com.zjrcinfo.zjguahao.common.redis.delayqueue;
import com.zjrcinfo.zjguahao.common.redis.cluster.JedisClusterCache;
import com.zjrcinfo.zjguahao.common.utils.ThreadPoolUtil;
import com.zjrcinfo.zjguahao.common.web.log.LoggerName;
import org.apache.shiro.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import redis.clients.jedis.Tuple;

import javax.annotation.PostConstruct;
import java.util.Calendar;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
 - Description: 延时队列工厂
 - User: zhouzhou
 - Date: 2019-09-26
 - Time: 14:32
 */
public abstract class AbstractDelayQueueMachineFactory {

    protected Logger logger = LoggerFactory.getLogger(LoggerName.KAFKA);

    @Autowired
    protected JedisClusterCache jedisClusterCache;

    /**
     * 插入任务id
     * @param jobId 任务id(队列内唯一)
     * @param time 延时时间(单位 :秒)
     * @return 是否插入成功
     */
    public boolean addJobId(String jobId,  Integer time) {
        Calendar instance = Calendar.getInstance();
        instance.add(Calendar.SECOND, time);
        long delaySeconds = instance.getTimeInMillis() / 1000;
        Long zadd = jedisClusterCache.zadd(setDelayQueueName(), delaySeconds, jobId);
        return zadd > 0;

    }

    private void startDelayQueueMachine() {
        logger.info(String.format("延时队列机器{%s}开始运作", setDelayQueueName()));

        // 发生异常捕获并且继续不能让战斗停下来
        while (true) {
            try {
                // 获取当前时间的时间戳
                long now = System.currentTimeMillis() / 1000;
                // 获取当前时间前的任务列表
                Set tuples = jedisClusterCache.zrangeByScoreWithScores(setDelayQueueName(), 0, now);
                // 如果不为空则遍历判断其是否满足取消要求
                if (!CollectionUtils.isEmpty(tuples)) {
                    for (Tuple tuple : tuples) {

                        String jobId = tuple.getElement();
                        Long num = jedisClusterCache.zrem(setDelayQueueName(), jobId);
                        // 如果移除成功, 则取消订单
                        if (num > 0) {
                            ThreadPoolUtil.execute(() ->invoke(jobId));
                        }
                    }
                }

            } catch (Exception e) {
                logger.warn(String.format("处理延时任务发生异常,异常原因为{%s}", e.getMessage()), e);
            } finally {
                // 间隔一秒钟搞一次
                try {
                    TimeUnit.SECONDS.sleep(1L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }

    }


    /**
     * 最终执行的任务方法
     * @param jobId 任务id
     */
    public abstract void invoke(String jobId);


    /**
     * 要实现延时队列的名字
     *
     */
    public abstract String setDelayQueueName();


    @PostConstruct
    public void init(){
        new Thread(this::startDelayQueueMachine).start();
    }

}
  • 测试延时队列的实现
/**
 * Description: 测试订单延时队列
 * User: zhouzhou
 * Date: 2019-09-26
 * Time: 15:14
 */
@Component
public class TestOrderDelayQueue extends AbstractDelayQueueMachineFactory {

    @Autowired
    private TestOrderDelayQueueService testOrderDelayQueueService;

    @Override
    public void invoke(String jobId) {
        testOrderDelayQueueService.cancelOrder(jobId);
    }

    @Override
    public String setDelayQueueName() {
        return "TestOrder";
    }

}
  • 具体延时消费的Service
package com.zjrcinfo.zjguahao.product.service.impl;

import org.springframework.stereotype.Service;

/**
 * Description:
 * User: zhouzhou
 * Date: 2019-09-26
 * Time: 15:21
 */
@Service
public class TestOrderDelayQueueService {


    public void cancelOrder(String orderNumber) {
        System.out.println(System.currentTimeMillis() + "ms:redis消费了一个任务:消费的订单OrderId为" + orderNumber);
    }

    public void cancelReg(String orderNumber) {
        System.out.println(System.currentTimeMillis() + "ms:redis消费了一个任务:消费的挂号订单OrderId为" + orderNumber);
    }
}
  • 测试用的Controller
package com.zjrcinfo.zjguahao.product.controller;
 
 
import com.zjrcinfo.zjguahao.common.web.log.LoggerName;
import com.zjrcinfo.zjguahao.product.service.delay.TestOrderDelayQueue;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
 
import java.util.Random;
 
/**
 * Description:
 * User: zhouzhou
 * Date: 2019-06-03
 * Time: 17:47
 */
@RestController
@Api("缓存测试")
@RequestMapping("/redis")
public class RedisTestController {
 
    private Logger logger = LoggerFactory.getLogger(LoggerName.REMOTE);
 
    @Autowired
    private TestOrderDelayQueue testOrderDelayQueue;
 
    // ------------------------  延时队列 -------------------
    @ApiOperation("添加定时orderId")
    @RequestMapping(value = "/addDelayOrder/{orderId}/{time}", method = RequestMethod.POST)
    public Object addZset(@PathVariable String orderId, @PathVariable Integer time) {
 
        boolean flag = testOrderDelayQueue.addJobId(orderId, time);
        return String.format("已经存入了订单id{%s},延时{%s}秒", orderId, time);
 
    }