我的编程空间,编程开发者的网络收藏夹
学习永远不晚

SpringBootEvent事件如何实现异步延迟执行

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

SpringBootEvent事件如何实现异步延迟执行

SpringBoot Event 事件实现异步延迟执行

Spring的事件(Application Event)非常好用,虽然有一点会出现代码污染,但是在做不使用其他框架来做异步的情况先,还是非常方便的。

使用它只需要三样东西

  • 自定义事件:继承 ApplicationEvent,创建一个你想传的数据的对象,会在监听器那边收到该对象。
  • 定义监听器,实现 ApplicationListener 或者通过 @EventListener 注解到方法上,两种方式都行,但是推荐使用@EventListener,只要参数是你写的继承ApplicationEvent的对象,就会自动找到执行方法。
  • 定义发布者,通过 ApplicationEventPublisher,自带的bean,不需要单独声明,直接@Autowired就能使用,主要只需要publishEvent方法。

但是有时候我需要做延时执行,自带的功能缺不支持,但是我发现ApplicationEvent对象里面有两个成员变量,source和timestamp,构造函数(@since 5.3.8)也提供了同时注入这两个变量数据。

   
    public ApplicationEvent(Object source, Clock clock) {
        super(source);
        this.timestamp = clock.millis();
    }

但是,看了说明timestamp只是标志执行的时间,并不是为了延迟执行,可惜了。

于是查了一些资料,找到java.util.concurrent.DelayQueue对象,JDK自带了延迟的队列对象,我们可以考虑利用自带的timestamp和延迟队列DelayQueue结合一起来实现,具体DelayQueue的使用请自行查询,非常的简单。

首先,继承的ApplicationEvent重新实现一下。

不单单要继承ApplicationEvent,还需要实现Delayed,主要是因为DelayQueue队列中必须是Delayed的实现类

import java.time.Clock;
import java.time.Duration;
import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
 
import org.springframework.context.ApplicationEvent;
 
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
 
@Data
@EqualsAndHashCode(callSuper = false)
public class ApplicationDelayedEvent extends ApplicationEvent implements Delayed {
 
    private static final long serialVersionUID = 1L;
 
    public ApplicationDelayedEvent(Object source) {
        this(source, 0L);
    }
 
    public ApplicationDelayedEvent(Object source, long delaySeconds) {
        super(source, Clock.offset(Clock.systemDefaultZone(), Duration.ofSeconds(delaySeconds)));
    }
 
    @Override
    public int compareTo(Delayed o) {
        // 最好用NANOSECONDS,更精确,但是用处不大
        long delta = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
        return (int) delta;
    }
 
    @Override
    public long getDelay(TimeUnit unit) {
        // 最好用NANOSECONDS,更精确,但是用处不大,负数也会认为到时间了
        long millis = this.getTimestamp();
        long currentTimeMillis = System.currentTimeMillis();
        long sourceDuration = millis - currentTimeMillis;
        return unit.convert(sourceDuration, unit);
    }
}

多了两个必须实现的方法,compareTo是排序,应该是队列中的顺序。

getDelay是主要的方法,目的是归0的时候会从DelayQueue释放出来,当然那必须是NANOSECONDS级别的,我使用MILLISECONDS,就会出现负数,但也是可以的,也能释放出来。

另一个需要改的就是发布者,所以重新写一个ApplicationDelayEventPublisher

import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.DelayQueue;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
 
import lombok.extern.slf4j.Slf4j;
 
@Slf4j
@Component
public class ApplicationDelayEventPublisher implements ApplicationRunner {
 
    // ApplicationDelayedEvent需要import进来
    private DelayQueue<ApplicationDelayedEvent> delayQueue = new DelayQueue<>();
 
    @Autowired
    private ApplicationEventPublisher eventPublisher;
 
    @Autowired
    @Qualifier("watchTaskExecutor")
    private ThreadPoolTaskExecutor poolTaskExecutor;
 
    public void publishEvent(ApplicationDelayedEvent event) {
        boolean result = delayQueue.offer(event);
        log.info("加入延迟队列。。。。{}", result);
    }
 
    @Override
    public void run(ApplicationArguments args) throws Exception {
        poolTaskExecutor.execute(() -> watchThread());
    }
 
    private void watchThread() {
        while (true) {
            try {
                log.info("启动延时任务的监听线程。。。。");
                ApplicationDelayedEvent event = this.delayQueue.take();
                log.info("接收到延时任务执行。。。{}", ZonedDateTime.now().format(DateTimeFormatter.ISO_OFFSET_DATE_TIME));
                eventPublisher.publishEvent(event);
            } catch (InterruptedException e) {
                log.info("启动延时任务的监听线程关闭");
                this.delayQueue.clear();
                break;
            }
        }
    }
}

需要实现ApplicationRunner作为Spring boot的启动时候运行的bean,目的就是开启监听线程,有事件到了执行时间take方法会得到数据,然后调用Spring原生的事件发布。

另外特别说明的就是监听线程不能随便创建,脱离了Spring容器的线程池会造成关闭服务的时候造成无法关闭的现象,所以建议还是自定义一个ThreadPoolTaskExecutor

    @Bean
    public ThreadPoolTaskExecutor watchTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(1);
        executor.setMaxPoolSize(1);
        executor.setQueueCapacity(1);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("watch_task_");
 
        // 线程池对拒绝任务的处理策略
//        ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常
//        ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
//        ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面。
//        ThreadPoolExecutor.CallerRunsPolicy:由调用者处理该任务 。
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 初始化
        executor.initialize();
        return executor;
    }

最后就是接收事件,跟传统的接收是一样的,异步只需要在配置类上加上@EnableAsync注解就行了,然后在监听的方法上加@Async

import java.util.concurrent.ThreadPoolExecutor;
 
import javax.annotation.PostConstruct;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
import lombok.extern.slf4j.Slf4j;
 
@Slf4j
@Configuration
@EnableAsync
@ConditionalOnClass(ApplicationDelayEventPublisher.class)
public class DelayEventConfiguration {
 
    @PostConstruct
    public void init() {
        log.info("延迟Spring事件模块启动中。。。");
    }
    
    // 不能和监听线程放到一个线程池,不然无法执行
    @Bean
    public ThreadPoolTaskExecutor poolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(3);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(10000);
        executor.setKeepAliveSeconds(30);
        executor.setThreadNamePrefix("my_task_");
 
        // 线程池对拒绝任务的处理策略
//        ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常
//        ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
//        ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面。
//        ThreadPoolExecutor.CallerRunsPolicy:由调用者处理该任务 。
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        // 初始化
        executor.initialize();
        return executor;
    }
    
    @Bean
    public ThreadPoolTaskExecutor watchTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(1);
        executor.setMaxPoolSize(1);
        executor.setQueueCapacity(1);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("watch_task_");
 
        // 线程池对拒绝任务的处理策略
//        ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常
//        ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
//        ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面。
//        ThreadPoolExecutor.CallerRunsPolicy:由调用者处理该任务 。
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 初始化
        executor.initialize();
        return executor;
    }
}

    @Async("poolTaskExecutor")
    @EventListener
    public void listenDelayEvent(ApplicationDelayedEvent event) {
        log.info("收到执行事件:{}", event.getSource());
    }

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程网。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

SpringBootEvent事件如何实现异步延迟执行

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

SpringBootEvent事件如何实现异步延迟执行

这篇文章主要介绍了SpringBootEvent事件如何实现异步延迟执行问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
2023-02-14

Java如何实现异步延迟队列

这篇文章主要介绍“Java如何实现异步延迟队列”,在日常操作中,相信很多人在Java如何实现异步延迟队列问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Java如何实现异步延迟队列”的疑惑有所帮助!接下来,请跟
2023-07-05

批处理如何实现延迟执行命令

这篇文章将为大家详细讲解有关批处理如何实现延迟执行命令,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。1、2003的工具包里有个sleep.exe2、结合vbs实现的代码代码如下:echo.wscript.
2023-06-09

LINQ如何实现子查询和延迟执行编程

这篇文章将为大家详细讲解有关LINQ如何实现子查询和延迟执行编程,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。LINQ子查询 LINQ子查询是一个包含了另外一个查询的Lambda表达式的查询. 以下的例子
2023-06-17

java代码如何实现异步执行

在Java中,可以使用多线程或者使用Java 8之后引入的CompletableFuture来实现异步执行。使用多线程:Thread thread = new Thread(() -> {// 异步执行的代码逻辑});thread.s
2023-10-25

Laravel如何实现supervisor执行异步进程

今天小编给大家分享一下Laravel如何实现supervisor执行异步进程的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。问
2023-07-04

如何实现Spring Event(异步事件)

这篇文章主要介绍了如何实现Spring Event(异步事件)问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
2023-02-14

JavaScript如何实现异步任务循环顺序执行

今天小编给大家分享一下JavaScript如何实现异步任务循环顺序执行的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。需求场景
2023-07-05

RabbitMQ延迟队列如何实现订单支付结果异步阶梯性通知

小编给大家分享一下RabbitMQ延迟队列如何实现订单支付结果异步阶梯性通知,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!在第三方支付中,例如支付宝、或者微信,对于订单请求,第三方支付系统采用的是消息同步返回、异步通知+主
2023-06-29

游戏开发中网络同步与延迟处理技术(如何实现游戏中的网络同步与延迟优化?)

本文探讨了游戏开发中的网络同步与延迟处理技术,包括:网络同步:帧同步:客户端复制服务器帧顺序。客户端预测和服务器验证:客户端预测并由服务器调整。回滚净代码:服务器保存回滚状态并回滚客户端。延迟优化:减少延迟源:优化网络基础设施。预测和补偿:补偿延迟造成的视觉故障。数据压缩:降低带宽使用量。延迟补偿:调整游戏逻辑缓解延迟影响。游戏服务器位置:靠近玩家以减少延迟。其他策略包括服务器管理、玩家教育和服务器授权。通过结合这些技术,开发者可为玩家提供流畅的多人游戏体验。
游戏开发中网络同步与延迟处理技术(如何实现游戏中的网络同步与延迟优化?)
2024-04-02

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录