我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Java手写线程池之向JDK线程池进发

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Java手写线程池之向JDK线程池进发

前言

在前面的文章自己动手写乞丐版线程池中,我们写了一个非常简单的线程池实现,这个只是一个非常简单的实现,在本篇文章当中我们将要实现一个和JDK内部实现的线程池非常相似的线程池。

JDK线程池一瞥

我们首先看一个JDK给我们提供的线程池ThreadPoolExecutor的构造函数的参数:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 

参数解释:

1.corePoolSize:这个参数你可以理解为线程池当中至少需要 corePoolSize 个线程,初始时线程池当中线程的个数为0,当线程池当中线程的个数小于 corePoolSize 每次提交一个任务都会创建一个线程,并且先执行这个提交的任务,然后再去任务队列里面去获取新的任务,然后再执行。

2.maximumPoolSize:这个参数指的是线程池当中能够允许的最大的线程的数目,当任务队列满了之后如果这个时候有新的任务想要加入队列当中,当发现队列满了之后就创建新的线程去执行任务,但是需要满足最大的线程的个数不能够超过 maximumPoolSize 。

3.keepAliveTime 和 unit:这个主要是用于时间的表示,当队列当中多长时间没有数据的时候线程自己退出,前面谈到了线程池当中任务过多的时候会超过 corePoolSize ,当线程池闲下来的时候这些多余的线程就可以退出了。

4.workQueue:这个就是用于保存任务的阻塞队列。

5.threadFactory:这个参数倒不是很重要,线程工厂。

6.handler:这个表示拒绝策略,JDK给我们提供了四种策略:

  • AbortPolicy:抛出异常。
  • DiscardPolicy:放弃这个任务。
  • CallerRunPolicy:提交任务的线程执行。
  • DiscardOldestPolicy:放弃等待时间最长的任务。

如果上面的参数你不能够理解,可以先阅读这篇文章自己动手写乞丐版线程池。基于上面谈到的参数,线程池当中提交任务的流程大致如下图所示:

自己动手实现线程池

根据前面的参数分析我们自己实现的线程池需要实现一下功能:

  • 能够提交Runnable的任务和Callable的任务。
  • 线程池能够自己实现动态的扩容和所容,动态调整线程池当中线程的数目,当任务多的时候能够增加线程的数目,当任务少的时候多出来的线程能够自动退出。
  • 有自己的拒绝策略,当任务队列满了,线程数也达到最大的时候,需要拒绝提交的任务。

线程池参数介绍

  private AtomicInteger ct = new AtomicInteger(0); // 当前在执行任务的线程个数
  private int corePoolSize;
  private int maximumPoolSize;
  private long keepAliveTime;
  private TimeUnit unit;
  private BlockingQueue<Runnable> taskQueue;
  private RejectPolicy policy;

  private ArrayList<Worker> workers = new ArrayList<>();

  private volatile boolean isStopped;
  private boolean useTimed;

参数解释如下:

  • ct:表示当前线程池当中线程的个数。
  • corePoolSize:线程池当中核心线程的个数,意义和上面谈到的JDK的线程池意义一致。
  • maximumPoolSize:线程池当中最大的线程个数,意义和上面谈到的JDK的线程池意义一致。
  • keepAliveTime 和 unit:和JDK线程池的参数意义一致。
  • taskQueue:任务队列,用不保存提交的任务。
  • policy:拒绝策略,主要有一下四种策略:
public enum RejectPolicy {

  ABORT,
  CALLER_RUN,
  DISCARD_OLDEST,
  DISCARD
}

workers:用于保存工作线程。

isStopped:线程池是否被关闭了。

useTimed:主要是用于表示是否使用上面的 keepAliveTime 和 unit,如果使用就是在一定的时间内,如果没有从任务队列当中获取到任务,线程就从线程池退出,但是需要保证线程池当中最小的线程个数不小于 corePoolSize 。

实现Runnable

  // 下面这个方法是向线程池提交任务
  public void execute(Runnable runnable) throws InterruptedException {
    checkPoolState();

    if (addWorker(runnable, false)  // 如果能够加入新的线程执行任务 加入成功就直接返回
            || !taskQueue.offer(runnable) // 如果 taskQueue.offer(runnable) 返回 false 说明提交任务失败 任务队列已经满了
            || addWorker(runnable, true)) // 使用能够使用的最大的线程数 (maximumPoolSize) 看是否能够产生新的线程
      return;

    // 如果任务队列满了而且不能够加入新的线程 则拒绝这个任务
    if (!taskQueue.offer(runnable))
      reject(runnable);
  }

在上面的代码当中:

checkPoolState函数是检查线程池的状态,当线程池被停下来之后就不能够在提交任务:

  private void checkPoolState() {
    if (isStopped) {
      // 如果线程池已经停下来了,就不在向任务队列当中提交任务了
      throw new RuntimeException("thread pool has been stopped, so quit submitting task");
    }
  }

addWorker函数是往线程池当中提交任务并且产生一个线程,并且这个线程执行的第一个任务就是传递的参数。max表示线程的最大数目,max == true 的时候表示使用 maximumPoolSize 否则使用 corePoolSize,当返回值等于 true 的时候表示执行成功,否则表示执行失败。

  
  public synchronized boolean addWorker(Runnable runnable, boolean max) {

    if (ct.get() >= corePoolSize && !max)
      return false;
    if (ct.get() >= maximumPoolSize && max)
      return false;
    Worker worker = new Worker(runnable);
    workers.add(worker);
    Thread thread = new Thread(worker, "ThreadPool-" + "Thread-" + ct.addAndGet(1));
    thread.start();
    return true;
  }

实现Callable

这个函数其实比较简单,只需要将传入的Callable对象封装成一个FutureTask对象即可,因为FutureTask实现了Callable和Runnable两个接口,然后将这个结果返回即可,得到这个对象,再调用对象的 get 方法就能够得到结果。

  public <V> RunnableFuture<V> submit(Callable<V> task) throws InterruptedException {
    checkPoolState();
    FutureTask<V> futureTask = new FutureTask<>(task);
    execute(futureTask);
    return futureTask;
  }

拒绝策略的实现

根据前面提到的各种策略的具体实现方式,具体的代码实现如下所示:

  private void reject(Runnable runnable) throws InterruptedException {
    switch (policy) {
      case ABORT:
        throw new RuntimeException("task queue is full");
      case CALLER_RUN:
        runnable.run();
      case DISCARD: // 直接放弃这个任务
        return;
      case DISCARD_OLDEST:
        // 放弃等待时间最长的任务 也就是队列当中的第一个任务
        taskQueue.poll();
        execute(runnable); // 重新执行这个任务
    }
  }

线程池关闭实现

一共两种方式实现线程池关闭:

  • 直接关闭线程池,不管任务队列当中的任务是否被全部执行完成。
  • 安全关闭线程池,先等待任务队列当中所有的任务被执行完成,再关闭线程池,但是在这个过程当中不允许继续提交任务了,这一点已经在函数 checkPoolState 当中实现了。
  // 强制关闭线程池
  public synchronized void stop() {
    isStopped = true;
    for (Worker worker : workers) {
      worker.stopWorker();
    }
  }

  public synchronized void shutDown() {
    // 先表示关闭线程池 线程就不能再向线程池提交任务
    isStopped = true;
    // 先等待所有的任务执行完成再关闭线程池
    waitForAllTasks();
    stop();
  }

  private void waitForAllTasks() {
    // 当线程池当中还有任务的时候 就不退出循环
    while (taskQueue.size() > 0) {
      Thread.yield();
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

工作线程的工作实现

    @Override
    public void run() {
      // 先执行传递过来的第一个任务 这里是一个小的优化 让线程直接执行第一个任务 不需要
      // 放入任务队列再取出来执行了
      firstTask.run();

      thisThread = Thread.currentThread();
      while (!isStopped) {
        try {
          // 是否使用时间就在这里显示出来了
          Runnable task = useTimed ? taskQueue.poll(keepAliveTime, unit) : taskQueue.take();
          if (task == null) {
            int i;
            boolean exit = true;
            // 如果当前线程数大于核心线程数 则使用 CAS 去退出 用于保证在线程安全下的退出
            // 且保证线程的个数不小于 corePoolSize 下面这段代码需要仔细分析一下
            if (ct.get() > corePoolSize) {
              do{
                i = ct.get();
                if (i <= corePoolSize) {
                  exit = false;
                  break;
                }
              }while (!ct.compareAndSet(i, i - 1));
              if (exit) {
                return;
              }
            }
          }else {
            task.run();
          }
        } catch (InterruptedException e) {
          // do nothing
        }
      }
    }

我们现在来仔细分析一下,线程退出线程池的时候是如何保证线程池当中总的线程数是不小于 corePoolSize 的!首先整体的框架是使用 CAS 进行实现,具体代码为 do ... while 操作,然后在 while 操作里面使用 CAS 进行测试替换,如果没有成功再次获取 ,当线程池当中核心线程的数目小于等于 corePoolSize 的时候也需要退出循环,因为线程池当中线程的个数不能小于 corePoolSize 。因此使用 break 跳出循环的线程是不会退出线程池的。

线程池实现的BUG

在我们自己实现的线程池当中当线程退出的时候,workers 当中还保存这指向这个线程的对象,但是当线程退出的时候我们还没有在 workers 当中删除这个对象,因此这个线程对象不会被垃圾回收器收集掉,但是我们这个只是一个线程池实现的例子而已,并不用于生产环境,只是为了帮助大家理解线程池的原理。

完整代码

package cscore.concurrent.java.threadpoolv2;


import java.util.ArrayList;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadPool {

  private AtomicInteger ct = new AtomicInteger(0); // 当前在执行任务的线程个数
  private int corePoolSize;
  private int maximumPoolSize;
  private long keepAliveTime;
  private TimeUnit unit;
  private BlockingQueue<Runnable> taskQueue;
  private RejectPolicy policy;

  private ArrayList<Worker> workers = new ArrayList<>();

  private volatile boolean isStopped;
  private boolean useTimed;

  public int getCt() {
    return ct.get();
  }

  public ThreadPool(int corePoolSize, int maximumPoolSize, TimeUnit unit, long keepAliveTime, RejectPolicy policy
          , int maxTasks) {
    // please add -ea to vm options to make assert keyword enable
    assert corePoolSize > 0;
    assert maximumPoolSize > 0;
    assert keepAliveTime >= 0;
    assert maxTasks > 0;

    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.unit = unit;
    this.policy = policy;
    this.keepAliveTime = keepAliveTime;
    taskQueue = new ArrayBlockingQueue<Runnable>(maxTasks);
    useTimed = keepAliveTime != 0;
  }

  
  public synchronized boolean addWorker(Runnable runnable, boolean max) {

    if (ct.get() >= corePoolSize && !max)
      return false;
    if (ct.get() >= maximumPoolSize && max)
      return false;
    Worker worker = new Worker(runnable);
    workers.add(worker);
    Thread thread = new Thread(worker, "ThreadPool-" + "Thread-" + ct.addAndGet(1));
    thread.start();
    return true;
  }

  // 下面这个方法是向线程池提交任务
  public void execute(Runnable runnable) throws InterruptedException {
    checkPoolState();

    if (addWorker(runnable, false)  // 如果能够加入新的线程执行任务 加入成功就直接返回
            || !taskQueue.offer(runnable) // 如果 taskQueue.offer(runnable) 返回 false 说明提交任务失败 任务队列已经满了
            || addWorker(runnable, true)) // 使用能够使用的最大的线程数 (maximumPoolSize) 看是否能够产生新的线程
      return;

    // 如果任务队列满了而且不能够加入新的线程 则拒绝这个任务
    if (!taskQueue.offer(runnable))
      reject(runnable);
  }

  private void reject(Runnable runnable) throws InterruptedException {
    switch (policy) {
      case ABORT:
        throw new RuntimeException("task queue is full");
      case CALLER_RUN:
        runnable.run();
      case DISCARD:
        return;
      case DISCARD_OLDEST:
        // 放弃等待时间最长的任务
        taskQueue.poll();
        execute(runnable);
    }
  }

  private void checkPoolState() {
    if (isStopped) {
      // 如果线程池已经停下来了,就不在向任务队列当中提交任务了
      throw new RuntimeException("thread pool has been stopped, so quit submitting task");
    }
  }

  public <V> RunnableFuture<V> submit(Callable<V> task) throws InterruptedException {
    checkPoolState();
    FutureTask<V> futureTask = new FutureTask<>(task);
    execute(futureTask);
    return futureTask;
  }

  // 强制关闭线程池
  public synchronized void stop() {
    isStopped = true;
    for (Worker worker : workers) {
      worker.stopWorker();
    }
  }

  public synchronized void shutDown() {
    // 先表示关闭线程池 线程就不能再向线程池提交任务
    isStopped = true;
    // 先等待所有的任务执行完成再关闭线程池
    waitForAllTasks();
    stop();
  }

  private void waitForAllTasks() {
    // 当线程池当中还有任务的时候 就不退出循环
    while (taskQueue.size() > 0) {
      Thread.yield();
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

  class Worker implements Runnable {

    private Thread thisThread;

    private final Runnable firstTask;
    private volatile boolean isStopped;

    public Worker(Runnable firstTask) {
      this.firstTask = firstTask;
    }

    @Override
    public void run() {
      // 先执行传递过来的第一个任务 这里是一个小的优化 让线程直接执行第一个任务 不需要
      // 放入任务队列再取出来执行了
      firstTask.run();

      thisThread = Thread.currentThread();
      while (!isStopped) {
        try {
          Runnable task = useTimed ? taskQueue.poll(keepAliveTime, unit) : taskQueue.take();
          if (task == null) {
            int i;
            boolean exit = true;
            if (ct.get() > corePoolSize) {
              do{
                i = ct.get();
                if (i <= corePoolSize) {
                  exit = false;
                  break;
                }
              }while (!ct.compareAndSet(i, i - 1));
              if (exit) {
                return;
              }
            }
          }else {
            task.run();
          }
        } catch (InterruptedException e) {
          // do nothing
        }
      }
    }

    public synchronized void stopWorker() {
      if (isStopped) {
        throw new RuntimeException("thread has been interrupted");
      }
      isStopped = true;
      thisThread.interrupt();
    }

  }

}

线程池测试

package cscore.concurrent.java.threadpoolv2;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;

public class Test {

  public static void main(String[] args) throws InterruptedException, ExecutionException {
    var pool = new ThreadPool(2, 5, TimeUnit.SECONDS, 10, RejectPolicy.ABORT, 100000);

    for (int i = 0; i < 10; i++) {
      RunnableFuture<Integer> submit = pool.submit(() -> {
        System.out.println(Thread.currentThread().getName() + " output a");
        try {
          Thread.sleep(10);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        return 0;
      });
      System.out.println(submit.get());
    }
    int n = 15;
    while (n-- > 0) {
      System.out.println("Number Threads = " + pool.getCt());
      Thread.sleep(1000);
    }
    pool.shutDown();
  }
}

上面测试代码的输出结果如下所示:

ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-3 output a
ThreadPool-Thread-4 output a
Number Threads = 5
ThreadPool-Thread-5 output a
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-3 output a
ThreadPool-Thread-4 output a
ThreadPool-Thread-5 output a
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-4 output a
ThreadPool-Thread-3 output a
ThreadPool-Thread-5 output a
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-4 output a
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 3
Number Threads = 2
Number Threads = 2
Number Threads = 2
Number Threads = 2

从上面的代码可以看出我们实现了正确的任务实现结果,同时线程池当中的核心线程数从 2 变到了 5 ,当线程池当中任务队列全部别执行完成之后,线程的数目重新降下来了,这确实是我们想要达到的结果。

总结

在本篇文章当中主要给大家介绍了如何实现一个类似于JDK中的线程池,里面有非常多的实现细节,大家可以仔细捋一下其中的流程,对线程池的理解将会非常有帮助。

以上就是Java手写线程池之向JDK线程池进发的详细内容,更多关于Java手写线程池的资料请关注编程网其它相关文章!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Java手写线程池之向JDK线程池进发

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Java手写线程池之向JDK线程池进发

在前面的文章自己动手写乞丐版线程池中,我们写了一个非常简单的线程池实现,这个只是一个非常简单的实现,在本篇文章当中我们将要实现一个和JDK内部实现的线程池非常相似的线程池,需要的可以了解一下
2022-11-13

Python并发编程之线程池/进程池

原文来自开源中国前言python标准库提供线程和多处理模块来编写相应的多线程/多进程代码,但当项目达到一定规模时,频繁地创建/销毁进程或线程是非常消耗资源的,此时我们必须编写自己的线程池/进程池来交换时间空间。但是从Python3.2开始,
2023-06-02

Java并发之怎么使用线程池

这篇文章主要介绍“Java并发之怎么使用线程池”,在日常操作中,相信很多人在Java并发之怎么使用线程池问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Java并发之怎么使用线程池”的疑惑有所帮助!接下来,请跟
2023-06-16

关于java连接池/线程池/内存池/进程池等汇总分析

这篇文章主要介绍了关于java连接池/线程池/内存池/进程池等汇总分析,本文将介绍池技术的由来、原理、优缺点以及常见的池技术类型,需要的朋友可以参考下
2023-05-16

10分钟带你徒手写个Java线程池

我们自己手动实现的线程池要比Java自身的线程池简单的多,我们去掉了各种复杂的处理方式,只保留了最核心的原理,感兴趣的小伙伴可以跟随小编一起学习一下
2023-05-16

Java怎么实现手写线程池并测试

本篇内容介绍了“Java怎么实现手写线程池并测试”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!线程池的核心流程:在线程池核心原理的源码中,涉
2023-07-05

Java concurrency线程池之线程池原理(三)_动力节点Java学院整理

线程有5种状态:新建状态,就绪状态,运行状态,阻塞状态,死亡状态。线程池也有5种状态;然而,线程池不同于线程,线程池的5种状态是:Running, SHUTDOWN, STOP, TIDYING, TERMINATED。线程池状态定义代码如
2023-05-31

Java concurrency线程池之线程池原理(二)_动力节点Java学院整理

线程池示例在分析线程池之前,先看一个简单的线程池示例。import java.util.concurrent.Executors;import java.util.concurrent.ExecutorService;public clas
2023-05-31

Java并发编程之线程池的示例分析

这篇文章将为大家详细讲解有关Java并发编程之线程池的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。什么是线程池是一种基于池化思想管理线程的工具。池化技术:池化技术简单点来说,就是提前保存大量的资
2023-06-20

Java实现手写一个线程池的示例代码

线程池技术想必大家都不陌生把,相信在平时的工作中没有少用,而且这也是面试频率非常高的一个知识点,那么大家知道它的实现原理和细节吗?本文就来通过手写一个简单的线程池框架,去掌握线程池的基本原理,感兴趣的可以学习一下
2022-11-13

Java实现手写线程池实例并测试详解

这篇文章主要来模拟一下线程池和工作队列的流程,以及编写代码和测试类进行测试。文中的示例代码讲解详细,感兴趣的小伙伴可以了解一下
2023-02-22

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录