我的编程空间,编程开发者的网络收藏夹
学习永远不晚

使用Redis实现延时任务的解决方案

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

使用Redis实现延时任务的解决方案

最近在生产环境刚好遇到了延时任务的场景,调研了一下目前主流的方案,分析了一下优劣并且敲定了最终的方案。这篇文章记录了调研的过程,以及初步方案的实现。

候选方案对比

下面是想到的几种实现延时任务的方案,总结了一下相应的优势和劣势。

方案 优势 劣势 选用场景
JDK 内置的延迟队列 DelayQueue 实现简单 数据内存态,不可靠 一致性相对低的场景
调度框架和 MySQL 进行短间隔轮询 实现简单,可靠性高 存在明显的性能瓶颈 数据量较少实时性相对低的场景
RabbitMQ 的 DLX 和 TTL,一般称为 死信队列 方案 异步交互可以削峰 延时的时间长度不可控,如果数据需要持久化则性能会降低 -
调度框架和 Redis 进行短间隔轮询 数据持久化,高性能 实现难度大 常见于支付结果回调方案
时间轮 实时性高 实现难度大,内存消耗大 实时性高的场景

如果应用的数据量不高,实时性要求比较低,选用调度框架和 MySQL 进行短间隔轮询这个方案是最优的方案。但是笔者遇到的场景数据量相对比较大,实时性并不高,采用扫库的方案一定会对 MySQL 实例造成比较大的压力。记得很早之前,看过一个PPT叫《盒子科技聚合支付系统演进》,其中里面有一张图片给予笔者一点启发:

使用Redis实现延时任务的解决方案

里面刚好用到了调度框架和 Redis 进行短间隔轮询实现延时任务的方案,不过为了分摊应用的压力,图中的方案还做了分片处理。鉴于笔者当前业务紧迫,所以在第一期的方案暂时不考虑分片,只做了一个简化版的实现。

由于PPT中没有任何的代码或者框架贴出,有些需要解决的技术点需要自行思考,下面会重现一次整个方案实现的详细过程。

场景设计

实际的生产场景是笔者负责的某个系统需要对接一个外部的资金方,每一笔资金下单后需要延时30分钟推送对应的附件。这里简化为一个订单信息数据延迟处理的场景,就是每一笔下单记录一条订单消息(暂时叫做 OrderMessage ),订单消息需要延迟5到15秒后进行异步处理。

使用Redis实现延时任务的解决方案

否决的候选方案实现思路

下面介绍一下其它四个不选用的候选方案,结合一些伪代码和流程分析一下实现过程。

JDK内置延迟队列

DelayQueue 是一个阻塞队列的实现,它的队列元素必须是 Delayed 的子类,这里做个简单的例子:


public class DelayQueueMain {
  private static final Logger LOGGER = LoggerFactory.getLogger(DelayQueueMain.class);
  public static void main(String[] args) throws Exception {
    DelayQueue<OrderMessage> queue = new DelayQueue<>();
    // 默认延迟5秒
    OrderMessage message = new OrderMessage("ORDER_ID_10086");
    queue.add(message);
    // 延迟6秒
    message = new OrderMessage("ORDER_ID_10087", 6);
    queue.add(message);
    // 延迟10秒
    message = new OrderMessage("ORDER_ID_10088", 10);
    queue.add(message);
    ExecutorService executorService = Executors.newSingleThreadExecutor(r -> {
      Thread thread = new Thread(r);
      thread.setName("DelayWorker");
      thread.setDaemon(true);
      return thread;
    });
    LOGGER.info("开始执行调度线程...");
    executorService.execute(() -> {
      while (true) {
        try {
          OrderMessage task = queue.take();
          LOGGER.info("延迟处理订单消息,{}", task.getDescription());
        } catch (Exception e) {
          LOGGER.error(e.getMessage(), e);
        }
      }
    });
    Thread.sleep(Integer.MAX_VALUE);
  }
  private static class OrderMessage implements Delayed {
    private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    
    private static final long DELAY_MS = 1000L * 5;
    
    private final String orderId;
    
    private final long timestamp;
    
    private final long expire;
    
    private final String description;
    public OrderMessage(String orderId, long expireSeconds) {
      this.orderId = orderId;
      this.timestamp = System.currentTimeMillis();
      this.expire = this.timestamp + expireSeconds * 1000L;
      this.description = String.format("订单[%s]-创建时间为:%s,超时时间为:%s", orderId,
          LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F),
          LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F));
    }
    public OrderMessage(String orderId) {
      this.orderId = orderId;
      this.timestamp = System.currentTimeMillis();
      this.expire = this.timestamp + DELAY_MS;
      this.description = String.format("订单[%s]-创建时间为:%s,超时时间为:%s", orderId,
          LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F),
          LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F));
    }
    public String getOrderId() {
      return orderId;
    }
    public long getTimestamp() {
      return timestamp;
    }
    public long getExpire() {
      return expire;
    }
    public String getDescription() {
      return description;
    }
    @Override
    public long getDelay(TimeUnit unit) {
      return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }
    @Override
    public int compareTo(Delayed o) {
      return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }
  }
}

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

使用Redis实现延时任务的解决方案

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Java处理延时任务的解决方案有哪些

本篇内容介绍了“Java处理延时任务的解决方案有哪些”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!数据库轮询原理通过一个线程定时的扫描数据库
2023-06-30

一文详解golang延时任务的实现

这篇文章主要为大家介绍了golang延时任务的实现示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-03-20

基于Redis实现延时队列的优化方案小结

目录一、延时队列的应用二、延时队列的实编程客栈现三、总结一、延时队列的应用近期在开发部门的新项目,其中有个关键功能就是智能推送,即根据用户行为在特定的时间点向用户推送相应的提醒消息,比如以下业务场景:在用户点击充值项后,半小时内未充值,
2022-07-05

Spring Boot使用Schedule实现定时任务的方法

这篇文章主要介绍了Spring Boot使用Schedule实现定时任务,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
2023-03-22

Java实现定时任务的方法详解

大家都用过闹钟,闹钟可以说是一种定时任务。那么,在 Java 中,如何实现这样的功能呢?即如何实现定时任务呢?本文就来详细和大家聊聊
2022-11-13

Redis 实现分布式锁时需要考虑的问题解决方案

目录引言第一部分:什么是分布式锁?1.1 分布式锁的定义1.2 分布式锁的特性1.3 分布式锁的应用场景第二部分:Redis 实现分布式锁的基本原理2.1 Redis 的原子性操作2.2 锁的自动释放机制2.3 Redis 分布式锁的基本流
Redis 实现分布式锁时需要考虑的问题解决方案
2024-09-29

编程热搜

目录