我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Java线程池流程编排运用实战源码

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Java线程池流程编排运用实战源码

引导语

在线程池的面试中,面试官除了喜欢问 ThreadPoolExecutor 的底层源码外,还喜欢问你有没有在实际的工作中用过 ThreadPoolExecutor,我们在并发集合类的《场景集合:并发 List、Map 的应用场景》一文中说过一种简单的流程引擎,如果没有看过的同学,可以返回去看一下。

本章就在流程引擎的基础上运用 ThreadPoolExecutor,使用线程池实现 SpringBean 的异步执行。

1、流程引擎关键代码回顾

《场景集合:并发 List、Map 的应用场景》文中流程引擎执行 SpringBean 的核心代码为:

  // 批量执行 Spring Bean
  private void stageInvoke(String flowName, StageEnum stage, FlowContent content) {
    List<DomainAbilityBean>
        domainAbilitys =
        FlowCenter.flowMap.getOrDefault(flowName, Maps.newHashMap()).get(stage);
    if (CollectionUtils.isEmpty(domainAbilitys)) {
      throw new RuntimeException("找不到该流程对应的领域行为" + flowName);
    }
    for (DomainAbilityBean domainAbility : domainAbilitys) {
      // 执行 Spring Bean
      domainAbility.invoke(content);
    }
  }

 入参是 flowName(流程名称)、stage(阶段)、content(上下文),其中 stage 中会执行很多 SpringBean,SpringBean 被执行的代码是 domainAbility.invoke(content)。

2、异步执行 SpringBean

从上述代码中,我们可以看到所有的 SpringBean 都是串行执行的,效率较低,我们在实际业务中发现,有的 SpringBean 完全可以异步执行,这样既能完成业务请求,又能减少业务处理的 rt,对于这个需求,我们条件反射的有了两个想法:

需要新开线程来异步执行 SpringBean,可以使用 Runable 或者 Callable;业务请求量很大,我们不能每次来一个请求,就开一个线程,我们应该让线程池来管理异步执行的线程。

于是我们决定使用线程池来完成这个需求。

3、如何区分异步的 SpringBean

我们的 SpringBean 都是实现 DomainAbilityBean 这个接口的,接口定义如下:

public interface DomainAbilityBean {
  
  FlowContent invoke(FlowContent content);
}

从接口定义上来看,没有预留任何地方来标识该 SpringBean 应该是同步执行还是异步执行,这时候我们可以采取注解的方式,我们新建一个注解,只要 SpringBean 上有该注解,表示该 SpringBean 应该异步执行,否则应该同步执行,新建的注解如下:


@Target(ElementType.TYPE)// 表示该注解应该打在类上
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface AsyncComponent {
 
}

接着我们新建了两个 SpringBean,并在其中一个 SpringBean 上打上异步的注解,并且打印出执行 SpringBean 的线程名称,如下图:

图片描述

 图中实现了两个 SpringBean:BeanOne 和 BeanTwo,其中 BeanTwo 被打上了 AsyncComponent 注解,表明 BeanTwo 应该被异步执行,两个 SpringBean 都打印出执行的线程的名称。

4、mock 流程引擎数据中心

 《场景集合:并发 List、Map 的应用场景》一文中,我们说可以从数据库中加载出流程引擎需要的数据,此时我们 mock 一下,mock 的代码如下:

@Component
public class FlowCenter {
  
  public static final Map<String, Map<StageEnum, List<DomainAbilityBean>>> flowMap
      = Maps.newConcurrentMap();
  
  @PostConstruct
  public void init() {
      // 初始化 flowMap mock
    Map<StageEnum, List<DomainAbilityBean>> stageMap = flowMap.getOrDefault("flow1",Maps.newConcurrentMap());
    for (StageEnum value : StageEnum.values()) {
      List<DomainAbilityBean> domainAbilitys = stageMap.getOrDefault(value, Lists.newCopyOnWriteArrayList());
      if(CollectionUtils.isEmpty(domainAbilitys)){
        domainAbilitys.addAll(ImmutableList.of(
            ApplicationContextHelper.getBean(BeanOne.class),
            ApplicationContextHelper.getBean(BeanTwo.class)
        ));
        stageMap.put(value,domainAbilitys);
      }
    }
    flowMap.put("flow1",stageMap);
    // 打印出加载完成之后的数据结果
    log.info("init success,flowMap is {}", JSON.toJSONString(flowMap));
  }
}

5、新建线程池

在以上操作完成之后,只剩下最后一步了,之前我们执行 SpringBean 时,是这行代码:domainAbility.invoke(content);

现在我们需要区分 SpringBean 是否是异步的,如果是异步的,丢到线程池中去执行,如果是同步的,仍然使用原来的方法进行执行,于是我们把这些逻辑封装到一个工具类中,工具类如下:

public class ComponentExecutor {
	// 我们新建了一个线程池
  private static ExecutorService executor = new ThreadPoolExecutor(15, 15,
                                                                   365L, TimeUnit.DAYS,
                                                                   new LinkedBlockingQueue<>());
	// 如果 SpringBean 上有 AsyncComponent 注解,表示该 SpringBean 需要异步执行,就丢到线程池中去
  public static final void run(DomainAbilityBean component, FlowContent content) {
    // 判断类上是否有 AsyncComponent 注解
    if (AnnotationUtils.isAnnotationPresent(AsyncComponent.class, AopUtils.getTargetClass(component))) {
      // 提交到线程池中
      executor.submit(() -> { component.invoke(content); });
      return;
    }
    // 同步 SpringBean 直接执行。
    component.invoke(content);
  }
}

我们把原来的执行代码替换成使用组件执行器执行,如下图:

图片描述

6、测试 

 以上步骤完成之后,简单的流程引擎就已经完成了,我们简单地在项目启动的时候加上测试,代码如下:

更严谨的做法,是会写单元测试来测试流程引擎,为了快一点,我们直接在项目启动类上加上了测试代码。

运行之后的关键结果如下: 

[main] demo.sixth.SynchronizedDemo: SynchronizedDemo init begin
[main] demo.sixth.SynchronizedDemo: SynchronizedDemo init end
[main] demo.three.flow.FlowCenter : init success,flowMap is {"flow1":{"PARAM_VALID":[{},{}],"AFTER_TRANSACTION":[{"$ref":"$.flow1.PARAM_VALID[0]"},{"$ref":"$.flow1.PARAM_VALID[1]"}],"BUSINESS_VALID":[{"$ref":"$.flow1.PARAM_VALID[0]"},{"$ref":"$.flow1.PARAM_VALID[1]"}],"IN_TRANSACTION":[{"$ref":"$.flow1.PARAM_VALID[0]"},{"$ref":"$.flow1.PARAM_VALID[1]"}]}}
o.s.j.e.a.AnnotationMBeanExporter  : Registering beans for JMX exposure on startup
[main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8080 (http)
[main] demo.DemoApplication : Started DemoApplication in 5.377 seconds (JVM running for 6.105)
[main] demo.three.flow.BeanOne : BeanOne is run,thread name is main
[main] demo.three.flow.BeanOne : BeanOne is run,thread name is main
[pool-1-thread-1] demo.three.flow.BeanTwo : BeanTwo is run,thread name is pool-1-thread-1
[main] demo.three.flow.BeanOne : BeanOne is run,thread name is main
[pool-1-thread-2] demo.three.flow.BeanTwo : BeanTwo is run,thread name is pool-1-thread-2
[pool-1-thread-3] demo.three.flow.BeanTwo : BeanTwo is run,thread name is pool-1-thread-3
[main] demo.three.flow.BeanOne : BeanOne is run,thread name is main
[pool-1-thread-4] demo.three.flow.BeanTwo : BeanTwo is run,thread name is pool-1-thread-4

 从运行结果中,我们可以看到 BeanTwo 已经被多个不同的线程异步执行了。

7、总结

 这是一个线程池在简单流程引擎上的运用实站,虽然这个流程引擎看起来比较简单,但在实际工作中,还是非常好用的,大家可以把代码拉下来,自己尝试一下,调试一下参数,比如当我新增 SpringBean 的时候,流程引擎的表现如何。

以上就是Java线程池流程编排运用实战源码的详细内容,更多关于Java线程池流程编排运用的资料请关注编程网其它相关文章!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Java线程池流程编排运用实战源码

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

java线程池的实现原理源码分析

这篇文章主要介绍“java线程池的实现原理源码分析”,在日常操作中,相信很多人在java线程池的实现原理源码分析问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”java线程池的实现原理源码分析”的疑惑有所帮助!
2023-06-30

Java如何使用线程池实现socket编程

这篇文章主要讲解了“Java如何使用线程池实现socket编程”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Java如何使用线程池实现socket编程”吧!前言以多个客户端和一个服务端的so
2023-06-29

在java项目中使用线程池实现并发编程

今天就跟大家聊聊有关在java项目中使用线程池实现并发编程,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。一、任务和执行策略之间的隐性耦合Executor可以将任务的提交和任务的执行策
2023-05-31

怎样深度解读java线程池设计思想及源码实现

这篇文章给大家介绍怎样深度解读java线程池设计思想及源码实现,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。前言线程池是非常重要的工具,如果你要成为一个好的工程师,还是得比较好地掌握这个知识,很多线上问题都是因为没有用
2023-06-02

在java项目中使用线程池如何实现获取运行线程数并控制线程启动速度

这期内容当中小编将会给大家带来有关在java项目中使用线程池如何实现获取运行线程数并控制线程启动速度,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。在java里, 我们可以使用Executors.newFi
2023-05-31

Java Lambda 表达式实战:用代码解锁函数式编程的奥秘

Java Lambda 表达式是 Java 8 中引入的一项重要特性,它允许使用更简洁、更具表达性的方式编写代码。本文将通过丰富的演示代码,带领你深入探索 Lambda 表达式的奥秘,帮助你掌握函数式编程的精髓。
Java Lambda 表达式实战:用代码解锁函数式编程的奥秘
2024-02-26

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录