Redis实现延迟任务(过期取消订单)
AI-摘要
切换
Qi GPT
AI初始化中...
介绍自己
生成本文简介
推荐相关文章
前往主页
前往tianli博客
1. 生产需求
- 用户下订单后,15分钟未支付自动取消;
- 用户成功下单支付后确认收货, 15天默认好评;
2. 实现思路
- 利用redis的排序列表,ZSet进行需求实现, 下面是我的流程图和思路导线
3. 思路说明
- 我们把Zset中的score当成时间戳, 这样我们就可以获得以时间戳排序的任务列表, 这我们通过score区间进行拉取任务,进行消费.
4. 代码封装实现
- 首先是封装延时队列的工厂(完美契合Spring框架), 如果想要创建自己的特色延时队列则需要继承这个抽象工厂
package com.zjrcinfo.zjguahao.common.redis.delayqueue;
import com.zjrcinfo.zjguahao.common.redis.cluster.JedisClusterCache;
import com.zjrcinfo.zjguahao.common.utils.ThreadPoolUtil;
import com.zjrcinfo.zjguahao.common.web.log.LoggerName;
import org.apache.shiro.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import redis.clients.jedis.Tuple;
import javax.annotation.PostConstruct;
import java.util.Calendar;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
- Description: 延时队列工厂
- User: zhouzhou
- Date: 2019-09-26
- Time: 14:32
*/
public abstract class AbstractDelayQueueMachineFactory {
protected Logger logger = LoggerFactory.getLogger(LoggerName.KAFKA);
@Autowired
protected JedisClusterCache jedisClusterCache;
/**
* 插入任务id
* @param jobId 任务id(队列内唯一)
* @param time 延时时间(单位 :秒)
* @return 是否插入成功
*/
public boolean addJobId(String jobId, Integer time) {
Calendar instance = Calendar.getInstance();
instance.add(Calendar.SECOND, time);
long delaySeconds = instance.getTimeInMillis() / 1000;
Long zadd = jedisClusterCache.zadd(setDelayQueueName(), delaySeconds, jobId);
return zadd > 0;
}
private void startDelayQueueMachine() {
logger.info(String.format("延时队列机器{%s}开始运作", setDelayQueueName()));
// 发生异常捕获并且继续不能让战斗停下来
while (true) {
try {
// 获取当前时间的时间戳
long now = System.currentTimeMillis() / 1000;
// 获取当前时间前的任务列表
Set tuples = jedisClusterCache.zrangeByScoreWithScores(setDelayQueueName(), 0, now);
// 如果不为空则遍历判断其是否满足取消要求
if (!CollectionUtils.isEmpty(tuples)) {
for (Tuple tuple : tuples) {
String jobId = tuple.getElement();
Long num = jedisClusterCache.zrem(setDelayQueueName(), jobId);
// 如果移除成功, 则取消订单
if (num > 0) {
ThreadPoolUtil.execute(() ->invoke(jobId));
}
}
}
} catch (Exception e) {
logger.warn(String.format("处理延时任务发生异常,异常原因为{%s}", e.getMessage()), e);
} finally {
// 间隔一秒钟搞一次
try {
TimeUnit.SECONDS.sleep(1L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 最终执行的任务方法
* @param jobId 任务id
*/
public abstract void invoke(String jobId);
/**
* 要实现延时队列的名字
*
*/
public abstract String setDelayQueueName();
@PostConstruct
public void init(){
new Thread(this::startDelayQueueMachine).start();
}
}
- 测试延时队列的实现
/**
* Description: 测试订单延时队列
* User: zhouzhou
* Date: 2019-09-26
* Time: 15:14
*/
@Component
public class TestOrderDelayQueue extends AbstractDelayQueueMachineFactory {
@Autowired
private TestOrderDelayQueueService testOrderDelayQueueService;
@Override
public void invoke(String jobId) {
testOrderDelayQueueService.cancelOrder(jobId);
}
@Override
public String setDelayQueueName() {
return "TestOrder";
}
}
- 具体延时消费的Service
package com.zjrcinfo.zjguahao.product.service.impl;
import org.springframework.stereotype.Service;
/**
* Description:
* User: zhouzhou
* Date: 2019-09-26
* Time: 15:21
*/
@Service
public class TestOrderDelayQueueService {
public void cancelOrder(String orderNumber) {
System.out.println(System.currentTimeMillis() + "ms:redis消费了一个任务:消费的订单OrderId为" + orderNumber);
}
public void cancelReg(String orderNumber) {
System.out.println(System.currentTimeMillis() + "ms:redis消费了一个任务:消费的挂号订单OrderId为" + orderNumber);
}
}
- 测试用的Controller
package com.zjrcinfo.zjguahao.product.controller;
import com.zjrcinfo.zjguahao.common.web.log.LoggerName;
import com.zjrcinfo.zjguahao.product.service.delay.TestOrderDelayQueue;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import java.util.Random;
/**
* Description:
* User: zhouzhou
* Date: 2019-06-03
* Time: 17:47
*/
@RestController
@Api("缓存测试")
@RequestMapping("/redis")
public class RedisTestController {
private Logger logger = LoggerFactory.getLogger(LoggerName.REMOTE);
@Autowired
private TestOrderDelayQueue testOrderDelayQueue;
// ------------------------ 延时队列 -------------------
@ApiOperation("添加定时orderId")
@RequestMapping(value = "/addDelayOrder/{orderId}/{time}", method = RequestMethod.POST)
public Object addZset(@PathVariable String orderId, @PathVariable Integer time) {
boolean flag = testOrderDelayQueue.addJobId(orderId, time);
return String.format("已经存入了订单id{%s},延时{%s}秒", orderId, time);
}
- 感谢你赐予我前进的力量
赞赏者名单
因为你们的支持让我意识到写文章的价值🙏
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 云深不知处
评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果