使用Redis实现延时任务的解决方案
最近在生产环境刚好遇到了延时任务的场景,调研了一下目前主流的方案,分析了一下优劣并且敲定了最终的方案。这篇文章记录了调研的过程,以及初步方案的实现。
候选方案对比
下面是想到的几种实现延时任务的方案,总结了一下相应的优势和劣势。
方案 | 优势 | 劣势 | 选用场景 |
---|---|---|---|
JDK 内置的延迟队列 DelayQueue |
实现简单 | 数据内存态,不可靠 | 一致性相对低的场景 |
调度框架和 MySQL 进行短间隔轮询 |
实现简单,可靠性高 | 存在明显的性能瓶颈 | 数据量较少实时性相对低的场景 |
RabbitMQ 的 DLX 和 TTL ,一般称为 死信队列 方案 |
异步交互可以削峰 | 延时的时间长度不可控,如果数据需要持久化则性能会降低 | - |
调度框架和 Redis 进行短间隔轮询 |
数据持久化,高性能 | 实现难度大 | 常见于支付结果回调方案 |
时间轮 | 实时性高 | 实现难度大,内存消耗大 | 实时性高的场景 |
如果应用的数据量不高,实时性要求比较低,选用调度框架和 MySQL
进行短间隔轮询这个方案是最优的方案。但是笔者遇到的场景数据量相对比较大,实时性并不高,采用扫库的方案一定会对 MySQL
实例造成比较大的压力。记得很早之前,看过一个PPT叫《盒子科技聚合支付系统演进》,其中里面有一张图片给予笔者一点启发:
里面刚好用到了调度框架和 Redis
进行短间隔轮询实现延时任务的方案,不过为了分摊应用的压力,图中的方案还做了分片处理。鉴于笔者当前业务紧迫,所以在第一期的方案暂时不考虑分片,只做了一个简化版的实现。
由于PPT中没有任何的代码或者框架贴出,有些需要解决的技术点需要自行思考,下面会重现一次整个方案实现的详细过程。
场景设计
实际的生产场景是笔者负责的某个系统需要对接一个外部的资金方,每一笔资金下单后需要延时30分钟推送对应的附件。这里简化为一个订单信息数据延迟处理的场景,就是每一笔下单记录一条订单消息(暂时叫做 OrderMessage
),订单消息需要延迟5到15秒后进行异步处理。
否决的候选方案实现思路
下面介绍一下其它四个不选用的候选方案,结合一些伪代码和流程分析一下实现过程。
JDK内置延迟队列
DelayQueue
是一个阻塞队列的实现,它的队列元素必须是 Delayed
的子类,这里做个简单的例子:
public class DelayQueueMain {
private static final Logger LOGGER = LoggerFactory.getLogger(DelayQueueMain.class);
public static void main(String[] args) throws Exception {
DelayQueue<OrderMessage> queue = new DelayQueue<>();
// 默认延迟5秒
OrderMessage message = new OrderMessage("ORDER_ID_10086");
queue.add(message);
// 延迟6秒
message = new OrderMessage("ORDER_ID_10087", 6);
queue.add(message);
// 延迟10秒
message = new OrderMessage("ORDER_ID_10088", 10);
queue.add(message);
ExecutorService executorService = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r);
thread.setName("DelayWorker");
thread.setDaemon(true);
return thread;
});
LOGGER.info("开始执行调度线程...");
executorService.execute(() -> {
while (true) {
try {
OrderMessage task = queue.take();
LOGGER.info("延迟处理订单消息,{}", task.getDescription());
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
});
Thread.sleep(Integer.MAX_VALUE);
}
private static class OrderMessage implements Delayed {
private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final long DELAY_MS = 1000L * 5;
private final String orderId;
private final long timestamp;
private final long expire;
private final String description;
public OrderMessage(String orderId, long expireSeconds) {
this.orderId = orderId;
this.timestamp = System.currentTimeMillis();
this.expire = this.timestamp + expireSeconds * 1000L;
this.description = String.format("订单[%s]-创建时间为:%s,超时时间为:%s", orderId,
LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F),
LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F));
}
public OrderMessage(String orderId) {
this.orderId = orderId;
this.timestamp = System.currentTimeMillis();
this.expire = this.timestamp + DELAY_MS;
this.description = String.format("订单[%s]-创建时间为:%s,超时时间为:%s", orderId,
LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F),
LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F));
}
public String getOrderId() {
return orderId;
}
public long getTimestamp() {
return timestamp;
}
public long getExpire() {
return expire;
}
public String getDescription() {
return description;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
}
}
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
使用Redis实现延时任务的解决方案
下载Word文档到电脑,方便收藏和打印~