我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Java多线程并发AbstractQueuedSynchronizer怎么使用

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Java多线程并发AbstractQueuedSynchronizer怎么使用

这篇文章主要介绍“Java多线程并发AbstractQueuedSynchronizer怎么使用”,在日常操作中,相信很多人在Java多线程并发AbstractQueuedSynchronizer怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Java多线程并发AbstractQueuedSynchronizer怎么使用”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

AbstractQueuedSynchronizer

AbstractQueuedSynchronizer 简称 AQS ,抽象队列同步器,用来实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关同步器的框架。这个类旨在为大多数依赖单个原子 int 值来表示同步状态的同步器提供基础的能力封装。 例如 ReentrantLock、Semaphore 和 FutureTask 等等都是基于 AQS 实现的,我们也可以继承 AQS 实现自定义同步器。

核心思想

网络上常见的解释是:

如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

个人理解,可以把 AQS 当成一把锁,它内部通过一个队列记录了所有要使用锁的请求线程,并且管理锁自己当前的状态(锁定、空闲等状态)。相当于 AQS 就是共享资源本身,当有线程请求这个资源是,AQS 将请求资源的线程记录当前工作线程,并将自身设置为锁定状态。后续其他线程请求这个 AQS 时,将请求线程记录到等待队列中,其他线程此时未获取到锁,进入阻塞等待状态。

为什么需要 AQS

在深入 AQS 前,我们应该持有一个疑问是为什么需要 AQS ?synchronized 关键字和 CAS 原子类都提供了丰富的同步方案了。

但在实际的需求中,对同步的需求是各式各样的,比如,我们需要对一个锁加上超时时间,那么光凭 synchronized 关键字或是 CAS 就无法实现了,需要对其进行二次封装。而 JDK 中提供了丰富的同步方案,比如 ReentrantLock ,而 ReentrantLock 是就是基于 AQS 实现的。

用法

这部分内容来自 JDK 的注释

要将此类用作同步器的基础,请在适用时重新定义以下方法,方法是使用 getState、setState 和/或 compareAndSetState 检查和/或修改同步状态:

  • tryAcquire

  • tryRelease

  • tryAcquireShared

  • tryReleaseShared

  • isHeldExclusively

默认情况下,这些方法中的每一个都会引发 UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该是短暂的而不是阻塞的。 定义这些方法是使用此类的唯一受支持的方法。 所有其他方法都被声明为最终方法,因为它们不能独立变化。

您可能还会发现从 AbstractOwnableSynchronizer 继承的方法对于跟踪拥有独占同步器的线程很有用。 鼓励您使用它们——这使监视和诊断工具能够帮助用户确定哪些线程持有锁。

即使此类基于内部 FIFO 队列,它也不会自动执行 FIFO 采集策略。

独占同步的核心形式为:

   Acquire:       while (!tryAcquire(arg)) {          enqueue thread if it is not already queued;          possibly block current thread;       }     Release:       if (tryRelease(arg))          unblock the first queued thread;

(共享模式类似,但可能涉及级联信号。)

因为在入队之前调用了获取中的检查,所以新获取的线程可能会抢在其他被阻塞和排队的线程之前。 但是,如果需要,您可以定义 tryAcquire 和/或 tryAcquireShared 以通过内部调用一个或多个检查方法来禁用插入,从而提供公平的 FIFO 获取顺序。 特别是,如果 hasQueuedPredecessors(一种专门为公平同步器使用的方法)返回 true,大多数公平同步器可以定义 tryAcquire 返回 false。 其他变化是可能的。

默认插入(也称为贪婪、放弃和避免护送)策略的吞吐量和可扩展性通常最高。 虽然这不能保证公平或无饥饿,但允许较早排队的线程在较晚的排队线程之前重新竞争,并且每次重新竞争都有无偏见的机会成功对抗传入线程。 此外,虽然获取不是通常意义上的“旋转”,但它们可能会在阻塞之前执行多次调用 tryAcquire 并穿插其他计算。 当独占同步只是短暂地保持时,这提供了自旋的大部分好处,而没有大部分责任。 如果需要,您可以通过预先调用获取具有“快速路径”检查的方法来增加这一点,可能会预先检查 hasContended 和/或 hasQueuedThreads 以仅在同步器可能不会被争用时才这样做。

此类通过将其使用范围专门用于可以依赖 int 状态、获取和释放参数以及内部 FIFO 等待队列的同步器,部分地为同步提供了高效且可扩展的基础。 如果这还不够,您可以使用原子类、您自己的自定义 java.util.Queue 类和 LockSupport 阻塞支持从较低级别构建同步器。

用法示例

这是一个不可重入互斥锁类,它使用值 0 表示未锁定状态,使用值 1 表示锁定状态。 虽然不可重入锁并不严格要求记录当前所有者线程,但无论如何,此类都会这样做以使使用情况更易于监控。

它还支持条件并公开一些检测方法:

class Mutex implements Lock, java.io.Serializable {   // Our internal helper class   private static class Sync extends AbstractQueuedSynchronizer {     // Acquires the lock if state is zero     public boolean tryAcquire(int acquires) {       assert acquires == 1; // Otherwise unused       if (compareAndSetState(0, 1)) {         setExclusiveOwnerThread(Thread.currentThread());         return true;       }       return false;     }     // Releases the lock by setting state to zero     protected boolean tryRelease(int releases) {       assert releases == 1; // Otherwise unused       if (!isHeldExclusively())         throw new IllegalMonitorStateException();       setExclusiveOwnerThread(null);       setState(0);       return true;     }     // Reports whether in locked state     public boolean isLocked() {       return getState() != 0;     }     public boolean isHeldExclusively() {       // a data race, but safe due to out-of-thin-air guarantees       return getExclusiveOwnerThread() == Thread.currentThread();     }     // Provides a Condition     public Condition newCondition() {       return new ConditionObject();     }     // Deserializes properly     private void readObject(ObjectInputStream s)         throws IOException, ClassNotFoundException {       s.defaultReadObject();       setState(0); // reset to unlocked state     }   }   // The sync object does all the hard work. We just forward to it.   private final Sync sync = new Sync();   public void lock()              { sync.acquire(1); }   public boolean tryLock()        { return sync.tryAcquire(1); }   public void unlock()            { sync.release(1); }   public Condition newCondition() { return sync.newCondition(); }   public boolean isLocked()       { return sync.isLocked(); }   public boolean isHeldByCurrentThread() {     return sync.isHeldExclusively();   }   public boolean hasQueuedThreads() {     return sync.hasQueuedThreads();   }   public void lockInterruptibly() throws InterruptedException {     sync.acquireInterruptibly(1);   }   public boolean tryLock(long timeout, TimeUnit unit)       throws InterruptedException {     return sync.tryAcquireNanos(1, unit.toNanos(timeout));   } }

这是一个类似于 CountDownLatch 的锁存器类,只是它只需要一个信号即可触发。 因为锁存器是非独占的,所以它使用共享的获取和释放方法。

 class BooleanLatch {   private static class Sync extends AbstractQueuedSynchronizer {     boolean isSignalled() { return getState() != 0; }     protected int tryAcquireShared(int ignore) {       return isSignalled() ? 1 : -1;     }     protected boolean tryReleaseShared(int ignore) {       setState(1);       return true;     }   }   private final Sync sync = new Sync();   public boolean isSignalled() { return sync.isSignalled(); }   public void signal()         { sync.releaseShared(1); }   public void await() throws InterruptedException {     sync.acquireSharedInterruptibly(1);   } }

AQS 底层原理

父类 AbstractOwnableSynchronizer

AbstractQueuedSynchronizer 继承自 AbstractOwnableSynchronizer ,后者逻辑十分简单:

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {    private static final long serialVersionUID = 3737899427754241961L;    protected AbstractOwnableSynchronizer() { }    private transient Thread exclusiveOwnerThread;    // 设置当前持有锁的线程    protected final void setExclusiveOwnerThread(Thread thread) {        exclusiveOwnerThread = thread;    }    protected final Thread getExclusiveOwnerThread() {        return exclusiveOwnerThread;    }}

AbstractOwnableSynchronizer 只是定义了设置持有锁的线程的能力。

CLH 队列

AQS 的等待队列是 CLH (Craig , Landin , and Hagersten) 锁定队列的变体,CLH 锁通常用于自旋锁。AQS 将每个请求共享资源的线程封装程一个 CLH 节点来实现的,这个节点的定义是:

        abstract static class Node {        volatile Node prev;       // initially attached via casTail        volatile Node next;       // visibly nonnull when signallable        Thread waiter;            // visibly nonnull when enqueued        volatile int status;      // written by owner, atomic bit ops by others        // methods for atomic operations        final boolean casPrev(Node c, Node v) {  // for cleanQueue            return U.weakCompareAndSetReference(this, PREV, c, v); // 通过 CAS 确保同步设置 prev 的值        }        final boolean casNext(Node c, Node v) {  // for cleanQueue            return U.weakCompareAndSetReference(this, NEXT, c, v);        }        final int getAndUnsetStatus(int v) {     // for signalling            return U.getAndBitwiseAndInt(this, STATUS, ~v);        }        final void setPrevRelaxed(Node p) {      // for off-queue assignment            U.putReference(this, PREV, p);        }        final void setStatusRelaxed(int s) {     // for off-queue assignment            U.putInt(this, STATUS, s);        }        final void clearStatus() {               // for reducing unneeded signals            U.putIntOpaque(this, STATUS, 0);        }        private static final long STATUS = U.objectFieldOffset(Node.class, "status");        private static final long NEXT = U.objectFieldOffset(Node.class, "next");        private static final long PREV = U.objectFieldOffset(Node.class, "prev");    }

CLH 的节点的数据结构是一个双向链表的节点,只不过每个操作都是经过 CAS 确保线程安全的。要加入 CLH 锁队列,您可以将其自动拼接为新的尾部;要出队,需要设置 head 字段,以便下一个符合条件的等待节点成为新的头节点:

 +------+  prev +-------+  prev +------+ |      | <---- |       | <---- |      | | head | next  | first | next  | tail | |      | ----> |       | ----> |      | +------+       +-------+       +------+

Node 中的 status 字段表示当前节点代表的线程的状态。

status 存在三种状态:

    static final int WAITING   = 1;          // must be 1    static final int CANCELLED = 0x80000000; // must be negative     static final int COND      = 2;          // in a condition wait
  • WAITING:表示等待状态,值为 1。

  • CANCELLED:表示当前线程被取消,为 0x80000000。

  • COND:表示当前节点在等待条件,也就是在条件等待队列中,值为 2。

在上面的 COND 中,提到了一个条件等待队列的概念。

首先,Node 是一个静态抽象类,它在 AQS 中存在三种实现类:

  • ExclusiveNode

  • SharedNode

  • ConditionNode

前两者都是空实现:

    static final class ExclusiveNode extends Node { }    static final class SharedNode extends Node { }

而最后的 ConditionNode 多了些内容:

    static final class ConditionNode extends Node implements ForkJoinPool.ManagedBlocker {        ConditionNode nextWaiter;         // 检查线程是否中断或当前线程的状态已取消等待。        public final boolean isReleasable() {            return status <= 1 || Thread.currentThread().isInterrupted();        }        public final boolean block() {            while (!isReleasable()) LockSupport.park();            return true;        }    }

ConditionNode 拓展了两个方法:

  • 检查线程状态是否处于等待。

  • 阻塞当前线程:当前线程正在等待执行,通过 LockSupport.park() 阻塞当前线程。这里通过 while 循环持续重试,尝试阻塞线程。

而到这一步,所有的信息都指向了一个相关的类 Condition 。

Condition

AQS 中的 Condition 的实现是内部类 ConditionObject :

public class ConditionObject implements Condition, java.io.Serializable

ConditionObject 实现了 Condition 接口和序列化接口,后者说明了该类型的对象可以进行序列化。而前者 Condition 接口,定义了一些行为能力:

public interface Condition {    void await() throws InterruptedException;    void awaitUninterruptibly();    long awaitNanos(long nanosTimeout) throws InterruptedException;    boolean await(long time, TimeUnit unit) throws InterruptedException;    boolean awaitUntil(Date deadline) throws InterruptedException;    void signal();    void signalAll();}

Condition 中定义的能力与 Java 的 Object 类中提供的同步相关方法(wait、notify 和 notifyAll) 代表的能力极为相似。前者提供了更丰富的等待方法。类比的角度来看,如果 Object 是配合 synchronized 关键字使用的,那么 Condition 就是用来配合基于 AQS 实现的锁来使用的接口。

可以将 Condition 的方法分为两组:等待和唤醒。

用于等待的方法

// 等待,当前线程在接到信号或被中断之前一直处于等待状态    void await() throws InterruptedException;// 等待,当前线程在接到信号之前一直处于等待状态,不响应中断void awaitUninterruptibly();//等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态 long awaitNanos(long nanosTimeout) throws InterruptedException;// 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。// 此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0boolean await(long time, TimeUnit unit) throws InterruptedException;// 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态    boolean awaitUntil(Date deadline) throws InterruptedException;

用于唤醒的方法

// 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。void signal();// 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。void signalAll();

ConditionObject

分析完 Condition ,继续来理解 ConditionObject。 ConditionObject 是 Condition 在 AQS 中的实现:

public class ConditionObject implements Condition, java.io.Serializable {        private transient ConditionNode firstWaiter;        private transient ConditionNode lastWaiter;    // ---- Signalling methods ----    // 移除一个或所有等待者并将其转移到同步队列。    private void doSignal(ConditionNode first, boolean all)    public final void signal()    public final void signalAll()    // ---- Waiting methods ----    // 将节点添加到条件列表并释放锁定。    private int enableWait(ConditionNode node)    // 如果最初放置在条件队列中的节点现在准备好重新获取同步队列,则返回 true。    private boolean canReacquire(ConditionNode node)     // 从条件队列中取消链接给定节点和其他非等待节点,除非已经取消链接。    private void unlinkCancelledWaiters(ConditionNode node)     // 实现不可中断的条件等待    public final void awaitUninterruptibly()    public final void await()    public final long awaitNanos(long nanosTimeout)    public final boolean awaitUntil(Date deadline)    public final boolean await(long time, TimeUnit unit)    //  ---- support for instrumentation ----    // 如果此条件是由给定的同步对象创建的,则返回 true。    final boolean isOwnedBy(AbstractQueuedSynchronizer sync)    // 查询是否有线程在此条件下等待。    protected final boolean hasWaiters()    // 返回在此条件下等待的线程数的估计值。    protected final int getWaitQueueLength()    // 返回一个集合,其中包含可能正在等待此 Condition 的那些线程。    protected final Collection<Thread> getWaitingThreads()}

ConditionObject 实现了 Condition 能力的基础上,拓展了对 ConditionNode 相关的操作,方法通过其用途可以划分为三组:

  • Signalling

  • Waiting

  • 其他方法

Signalling methods

        public final void signal() {            ConditionNode first = firstWaiter;            if (!isHeldExclusively())                throw new IllegalMonitorStateException();            if (first != null)                doSignal(first, false);        }        public final void signalAll() {            ConditionNode first = firstWaiter;            if (!isHeldExclusively())                throw new IllegalMonitorStateException();            if (first != null)                doSignal(first, true);        }

唤醒方法主要逻辑是通过 doSignal(ConditionNode first, boolean all) 实现的。doSignal 方法根据参数,进行一个 while 循环,

两个方法传递进来的都是头节点,也就是从 ConditionNode 双向链表的头节点开始遍历,如果第二个参数 all 设置为 false ,只执行一次遍历中逻辑。循环中的逻辑是:

// 最终都调用了这个方法private void doSignal(ConditionNode first, boolean all) {    while (first != null) {        // 取出 first 的下一个节点,设置为 next        ConditionNode next = first.nextWaiter;         // 如果 first 是链表中唯一的一个节点,设置 lastWaiter 为 null        if ((firstWaiter = next) == null) //             lastWaiter = null;        // 读取 first 的 status ,检查是否是 COND        if ((first.getAndUnsetStatus(COND) & COND) != 0) {             // first 处于 COND 状态,出队            enqueue(first);             // 通过 all 来判断是否将等待的线程都进行唤醒逻辑。            if (!all)                break;          }        first = next; // 循环指向下一个    }}

关键方法 enqueue(ConditionNode) 是 AQS 中的方法:

    final void enqueue(Node node) {        if (node != null) {            for (;;) {                // 获取尾节点                Node t = tail;                 // 避免不必要的内存屏障                node.setPrevRelaxed(t);                 if (t == null)                          // 空队列首先初始化一个头节点                    tryInitializeHead();                  else if (casTail(t, node)) { // 更新 tail 指针为 node (这里不是将 t = node)                    t.next = node; // 为节点 t 的 next 指针指向 node                    if (t.status < 0)  // t 的状态 < 0 一般代表后续节点需要运行了                        LockSupport.unpark(node.waiter);                    break;                }            }        }    }

可以看出 enqueue(ConditionNode) 中本质上是通过调用 LockSupport.unpark(node.waiter); 来唤醒线程的。

Waiting methods

对外提供的等待能力的方法包括:

    // 实现不可中断的条件等待    public final void awaitUninterruptibly()    public final void await()    public final long awaitNanos(long nanosTimeout)    public final boolean awaitUntil(Date deadline)    public final boolean await(long time, TimeUnit unit)

它们内部都用到了公共的逻辑:

    // 添加节点到 condition 列表并释放锁    private int enableWait(ConditionNode node)    private boolean canReacquire(ConditionNode node)     private void unlinkCancelledWaiters(ConditionNode node)

enableWait

        private int enableWait(ConditionNode node) {            if (isHeldExclusively()) { // 如果是当前线程持有锁资源                node.waiter = Thread.currentThread();  // 将节点的绑定的线程设置为当前线程                node.setStatusRelaxed(COND | WAITING); // 设置节点状态                ConditionNode last = lastWaiter;       // 获取 尾节点                if (last == null)                    firstWaiter = node;                // 如果列表为空, node 就是头节点                else                    last.nextWaiter = node;            // 否则,将尾节点的下一个节点设置为 node                lastWaiter = node;                     // 更新 lastWaiter 指针                int savedState = getState();           // 获取当前线程的同步状态                if (release(savedState))               // 在当前持有锁资源的线程尝试释放锁                    return savedState;            }            node.status = CANCELLED; // 当前线程未持有锁资源,更新 node 的状态为 CANCELLED            throw new IllegalMonitorStateException(); // 并抛出 IllegalMonitorStateException        }

这个方法对传入的节点插入到等待队列的队尾,并根据当前线程的状态进行了检查。关键方法的 release(int) :

    public final boolean release(int arg) {        if (tryRelease(arg)) { // 尝试释放锁资源            signalNext(head);  // 释放成功,唤醒下一个等待中的线程            return true;        }        return false;    }

唤醒给定节点的下一个节点(如果存在),通过调用 LockSupport.unpark(s.waiter) 唤醒节点对应的线程。

    private static void signalNext(Node h) {        Node s;        if (h != null && (s = h.next) != null && s.status != 0) {            s.getAndUnsetStatus(WAITING);            LockSupport.unpark(s.waiter);        }    }

canReacquire

检查传入的 node 是否在链表中,且不为头节点:

// 如果最初放置在条件队列中的节点现在准备好重新获取同步队列,则返回 true。private boolean canReacquire(ConditionNode node) {    // 检查传入的 node 是否在链表中,且不为头节点    return node != null && node.prev != null && isEnqueued(node);}
// in AQS final boolean isEnqueued(Node node) {    // 从 Node 双向链表尾部开始遍历,是否存在 node    for (Node t = tail; t != null; t = t.prev)        if (t == node)            return true;    return false;}

unlinkCancelledWaiters

        private void unlinkCancelledWaiters(ConditionNode node) {            // node 为空 / node 不是队尾 / node 是最后一个节点            if (node == null || node.nextWaiter != null || node == lastWaiter) {                ConditionNode w = firstWaiter, trail = null; // w = first , trail = null                // /从链表头节点开始遍历                while (w != null) {                     ConditionNode next = w.nextWaiter;  // 取出下一个节点                    if ((w.status & COND) == 0) {       // 当前节点的状态包含 COND                        w.nextWaiter = null;            // 当前节点的 next 设置为 null                         if (trail == null)              // 如果 trail 指针为空                            firstWaiter = next;         // firstWaiter 指向 next                        else                            trail.nextWaiter = next;    // trail 指针不为空,尾指针的 next 指向当前节点的下一个节点                         if (next == null)                            lastWaiter = trail; // 最后将 lastWaiter 设置为 trail (过滤后的 trail 链表插入到队尾)                    } else                        trail = w; // 头节点状态不是 COND,当前节点设置为 trail 指针。                    w = next; // 下一个循环                }             }        }

这个方法遍历 ConditionNode 队列,过滤掉状态不包含 COND 的节点。

对外提供的等待方法

上面三个方法是内部处理逻辑。而对外暴露的是以下五个方法:

    public final void awaitUninterruptibly()    public final void await()    public final long awaitNanos(long nanosTimeout)    public final boolean awaitUntil(Date deadline)    public final boolean await(long time, TimeUnit unit)

除了awaitUninterruptibly() ,其他方法所代表的能力和 Condition 接口中定义的所代表的能力基本一致。

awaitUninterruptibly

awaitUninterruptibly() 是用于实现不可中断的条件等待:

        public final void awaitUninterruptibly() {            ConditionNode node = new ConditionNode(); // 创建一个新的 node            int savedState = enableWait(node);        // 将这个新 node 插入,并返回 node 的状态            LockSupport.setCurrentBlocker(this);      // 设置 blocker            boolean interrupted = false, rejected = false;  // flag:中断和拒绝            while (!canReacquire(node)) {             // 当前线程关联的 node 不再等待队列                      if (Thread.interrupted())             // 尝试中断线程                    interrupted = true;                else if ((node.status & COND) != 0) {  // 中断线程不成功的情况下,如果 node 状态包含 COND                    // 尝试阻塞线程                    try {                        if (rejected)                              node.block(); // 实际上也是 LockSupport.park                        else                            ForkJoinPool.managedBlock(node);                     } catch (RejectedExecutionException ex) {                        rejected = true;    // 拒绝执行                    } catch (InterruptedException ie) {                        interrupted = true;   // 中断                    }                } else                    Thread.onSpinWait();        // 当前线程无法继续执行            }            // 不是队列中的唯一节点时执行下面逻辑            LockSupport.setCurrentBlocker(null);             node.clearStatus();   // 清除 node 的 status             acquire(node, savedState, false, false, false, 0L); // 【*】重点方法            if (interrupted)                Thread.currentThread().interrupt();        }

在这个方法中,首先讲解两个方法:

  • Thread.onSpinWait() 表示调用者暂时无法继续,直到其他活动发生一个或多个动作。 通过在自旋等待循环构造的每次迭代中调用此方法,调用线程向运行时指示它正忙于等待。 运行时可能会采取措施来提高调用自旋等待循环构造的性能。

  • ForkJoinPool.managedBlock(node) 则是通过 Blocker 来检查线程的运行状态,然后尝试阻塞线程。

最后是最关键的方法 acquire ,它的详细逻辑放到最后讲解, 这个方法的作用就是,当前线程进入等待后,需要将关联的线程开启一个自旋,挂起后能够持续去尝试获取锁资源。

await

        public final void await() throws InterruptedException {            if (Thread.interrupted())                throw new InterruptedException();            ConditionNode node = new ConditionNode();            int savedState = enableWait(node);            LockSupport.setCurrentBlocker(this); // for back-compatibility            boolean interrupted = false, cancelled = false, rejected = false;            while (!canReacquire(node)) {                if (interrupted |= Thread.interrupted()) {                    if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)                        break;              // else interrupted after signal                } else if ((node.status & COND) != 0) {                    try {                        if (rejected)                            node.block();                        else                            ForkJoinPool.managedBlock(node);                    } catch (RejectedExecutionException ex) {                        rejected = true;                    } catch (InterruptedException ie) {                        interrupted = true;                    }                } else                    Thread.onSpinWait();    // awoke while enqueuing            }            LockSupport.setCurrentBlocker(null);            node.clearStatus();            acquire(node, savedState, false, false, false, 0L);            if (interrupted) {                if (cancelled) {                    unlinkCancelledWaiters(node);                    throw new InterruptedException();                }                Thread.currentThread().interrupt();            }        }

await() 方法相较于 awaitUninterruptibly(),while 逻辑基本一致,最后多了一步 cancelled 状态检查,如果 cancelled = true ,调用 unlinkCancelledWaiters(node),去清理等待队列。

awaitNanos

awaitNanos(long) 在 await() 之上多了对超时时间的计算和处理逻辑:

        public final long awaitNanos(long nanosTimeout)                throws InterruptedException {            if (Thread.interrupted())                throw new InterruptedException();            ConditionNode node = new ConditionNode();            int savedState = enableWait(node);            long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout;            long deadline = System.nanoTime() + nanos;            boolean cancelled = false, interrupted = false;            while (!canReacquire(node)) {                if ((interrupted |= Thread.interrupted()) ||                    (nanos = deadline - System.nanoTime()) <= 0L) { // 多了一个超时条件                    if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)                        break;                } else                    LockSupport.parkNanos(this, nanos);            }            node.clearStatus();            acquire(node, savedState, false, false, false, 0L);            if (cancelled) {                unlinkCancelledWaiters(node);                if (interrupted)                    throw new InterruptedException();            } else if (interrupted)                Thread.currentThread().interrupt();            long remaining = deadline - System.nanoTime(); // avoid overflow            return (remaining <= nanosTimeout) ? remaining : Long.MIN_VALUE;        }

awaitUntil

awaitUntil(Date) 和 awaitNanos(long) 同理,只是将超时计算改成了日期计算:

            long abstime = deadline.getTime();            // ...            boolean cancelled = false, interrupted = false;            while (!canReacquire(node)) {                if ((interrupted |= Thread.interrupted()) ||                    System.currentTimeMillis() >= abstime) { // 时间检查                    if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)                        break;                } else                    LockSupport.parkUntil(this, abstime);            }

await(long, TimeUnit)

await(long, TimeUnit) 则是逻辑更加与 awaitNanos(long) 相似了, 只是多了一步计算 awaitNanos(long nanosTimeout) 中的参数 nanosTimeout 的操作:

long nanosTimeout = unit.toNanos(time);

acquire 方法

在 wait 方法组中,最终都会调用到这个逻辑:

    final int acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time) {        Thread current = Thread.currentThread();        byte spins = 0, postSpins = 0;   // 在取消第一个线程时重试        boolean interrupted = false, first = false;        Node pred = null;                // 入队时节点的前一个指针                for (;;) {            if (!first && (pred = (node == null) ? null : node.prev) != null && !(first = (head == pred))) {                if (pred.status < 0) {                    cleanQueue();           // predecessor cancelled                    continue;                } else if (pred.prev == null) {                    Thread.onSpinWait();    // ensure serialization                    continue;                }            }            if (first || pred == null) {                boolean acquired;                try {                    if (shared)                        acquired = (tryAcquireShared(arg) >= 0);                    else                        acquired = tryAcquire(arg);                } catch (Throwable ex) {                    cancelAcquire(node, interrupted, false);                    throw ex;                }                if (acquired) {                    if (first) {                        node.prev = null;                        head = node;                        pred.next = null;                        node.waiter = null;                        if (shared)                            signalNextIfShared(node);                        if (interrupted)                            current.interrupt();                    }                    return 1;                }            }            if (node == null) {                 // allocate; retry before enqueue                if (shared)                    node = new SharedNode();                else                    node = new ExclusiveNode();            } else if (pred == null) {          // try to enqueue                node.waiter = current;                Node t = tail;                node.setPrevRelaxed(t);         // avoid unnecessary fence                if (t == null)                    tryInitializeHead();                else if (!casTail(t, node))                    node.setPrevRelaxed(null);  // back out                else                    t.next = node;            } else if (first && spins != 0) {                --spins;                        // reduce unfairness on rewaits                Thread.onSpinWait();            } else if (node.status == 0) {                node.status = WAITING;          // enable signal and recheck            } else {                long nanos;                spins = postSpins = (byte)((postSpins << 1) | 1);                if (!timed)                    LockSupport.park(this);                else if ((nanos = time - System.nanoTime()) > 0L)                    LockSupport.parkNanos(this, nanos);                else                    break;                node.clearStatus();                if ((interrupted |= Thread.interrupted()) && interruptible)                    break;            }        }        return cancelAcquire(node, interrupted, interruptible);    }

这个方法会在 Node 关联的线程让出锁资源后,开启一个死循环尝试通过 tryAcquire 尝试获取锁资源,最后如果超时或尝试次数超出限制,会通过 LockSupport.park 阻塞自身。

到此,关于“Java多线程并发AbstractQueuedSynchronizer怎么使用”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注编程网网站,小编会继续努力为大家带来更多实用的文章!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Java多线程并发AbstractQueuedSynchronizer怎么使用

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Java多线程并发AbstractQueuedSynchronizer怎么使用

这篇文章主要介绍“Java多线程并发AbstractQueuedSynchronizer怎么使用”,在日常操作中,相信很多人在Java多线程并发AbstractQueuedSynchronizer怎么使用问题上存在疑惑,小编查阅了各式资料,
2023-07-02

Java多线程并发ReentrantLock怎么使用

这篇文章主要介绍“Java多线程并发ReentrantLock怎么使用”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Java多线程并发ReentrantLock怎么使用”文章能帮助大家解决问题。背景
2023-07-02

DelayQueue怎么在Java多线程并发开发中使用

这篇文章给大家介绍DelayQueue怎么在Java多线程并发开发中使用,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。Delayed,一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象。此接口的实现必须定
2023-05-31

Java并发之怎么使用线程池

这篇文章主要介绍“Java并发之怎么使用线程池”,在日常操作中,相信很多人在Java并发之怎么使用线程池问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Java并发之怎么使用线程池”的疑惑有所帮助!接下来,请跟
2023-06-16

java多线程并发执行怎么实现

在Java中实现多线程的并发执行有多种方式,以下是其中的几种常见方法:1. 继承Thread类:创建一个继承自Thread类的子类,并重写其run()方法。然后创建多个该子类的实例,并调用start()方法来启动线程。```javaclas
2023-09-27

java怎么实现多线程并发执行

Java实现多线程并发执行的方式有两种:继承Thread类和实现Runnable接口。继承Thread类:定义一个类,继承Thread类,重写run()方法,在run()方法中写入线程执行的逻辑。创建线程对象,调用start()方法启动线
2023-10-25

Java多线程并发之ReentrantLock

这篇文章主要介绍了Java 多线程并发ReentrantLock,文中有非常详细的代码示例,对正在学习java的小伙伴们有非常好的帮助,需要的朋友可以参考下
2023-05-18

pytest多线程与多设备并发appium怎么使用

这篇文章主要介绍了pytest多线程与多设备并发appium怎么使用的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇pytest多线程与多设备并发appium怎么使用文章都会有所收获,下面我们一起来看看吧。1、a
2023-07-02

怎么在java中实现多线程高并发

这篇文章将为大家详细讲解有关怎么在java中实现多线程高并发,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。1.JMM数据原子操作read(读取)∶从主内存读取数据load(载入):将主内存读
2023-06-14

怎么在Java并发包中使用ThreadPoolExecutor线程池

这篇文章给大家介绍怎么在Java并发包中使用ThreadPoolExecutor线程池,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。一、线程池简介线程池的使用主要是解决两个问题:①当执行大量异步任务的时候线程池能够提供
2023-06-15

Java多线程并发、并行、线程与进程实例分析

本篇内容介绍了“Java多线程并发、并行、线程与进程实例分析”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!一、并发与并行并发:指两个或多个事
2023-07-02

理解Java多线程之并发编程

这篇文章主要介绍了理解Java多线程之并发编程的相关资料,需要的朋友可以参考下
2023-02-02

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录