AbstractQueuedSynchronizer应用之条件锁

it2022-05-05  277

概念导入

       首先介绍一下什么是条件锁。以ArrayBlockingQueue为例子,一个线程要从队列中取元素,另一个线程要从队列中放元素,前者需要队列非空,而后者需要队列非满,这里的非空和非满就是两个条件,而且条件和动作之间需要有对对象锁的持有(否则条件改变,比如非空条件不满足了,此时再去取元素就会出错)。所以条件锁的含义是:某一个动作的完成需要满足某个条件,这个条件依赖于一个对象(锁主体,比如这里的队列)。         下面通过实际场景导出条件锁的实现方法。

       从上图我们可以看出条件锁需要一个等待方法和一个通知方法,还有一种是通知所有等待节点的方法,适用于线程2放入多个元素,通知多个等待取出元素的线程的情况。在这里,非空、非满的条件都依赖于队列,所以锁也是基于队列这个对象的锁。非空和非满都可能会有多个等待线程,所以需要维护自己的等待队列。

      针对条件锁的情况,AbstractQueuedSynchronizer中的类ConditionObject实现了Condition接口,ConditionObject中主要属性有firstWaiter和lastWaiter,用来做等待的节点列表。主要方法有await和signal、signalAll。

代码分析

1、signal

将waiter节点指针后移,并将该节点加入到等待队列中,在没有其他条件等待中的节点(如果有,则让该节点先被其他线程unpack)的情况下,unpack该节点的线程 

/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signal() { //在signal之前要先通过lock获取锁,所以会持有该锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }          /**          * Removes and transfers nodes until hit non-cancelled one or          * null. Split out from signal in part to encourage compilers          * to inline the case of no waiters.          * @param first (non-null) the first node on condition queue          */          //将waiter节点指针后移,并将该节点加入到等待队列中,并unpack该节点的线程         private void doSignal(Node first) {             do {                 if ( (firstWaiter = first.nextWaiter) == null)                     lastWaiter = null;                 first.nextWaiter = null;             } while (!transferForSignal(first) &&                     (first = firstWaiter) != null);         }    /**      * Transfers a node from a condition queue onto sync queue.      * Returns true if successful.      * @param node the node      * @return true if successfully transferred (else the node was      * cancelled before signal)      */     final boolean transferForSignal(Node node) {         /*          * If cannot change waitStatus, the node has been cancelled.          */          //如果该节点状态为已取消         if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))             return false;         /*          * Splice onto queue and try to set waitStatus of predecessor to          * indicate that thread is (probably) waiting. If cancelled or          * attempt to set waitStatus fails, wake up to resync (in which          * case the waitStatus can be transiently and harmlessly wrong).          */         Node p = enq(node);//将waiter,即condition等待队列中的对象插入到待唤醒队列的末尾,并返回原先的尾节点         int ws = p.waitStatus;         //如果该节点已经取消竞争,或者未被成功被设置成被待通知状态(则表示该节点的waitStatus不正确),都unpack 当前节点的线程         if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))             LockSupport.unpark(node.thread);         return true;     }

这里最后的unpark的判断条件可能会有疑问,这个我们在讲完所有代码后结合流程图进行讲解。

2、signalAll

 /**          * Moves all threads from the wait queue for this condition to          * the wait queue for the owning lock.          *          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}          *         returns {@code false}          */         public final void signalAll() {             if (!isHeldExclusively())                 throw new IllegalMonitorStateException();             Node first = firstWaiter;             if (first != null)                 doSignalAll(first);         }          /**          * Removes and transfers all nodes.          * @param first (non-null) the first node on condition queue          */         private void doSignalAll(Node first) {             lastWaiter = firstWaiter = null;             //遍历所有的waiter,所有节点的线程都unpack             do {                 Node next = first.nextWaiter;                 first.nextWaiter = null;                 transferForSignal(first);                 first = next;             } while (first != null);         }

3、await

/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ public final void await() throws InterruptedException { //在await之前会先通过lock获取锁,也就是没有其他资源占有锁时 if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); //增加等待节点 int savedState = fullyRelease(node); //进行一次release,因为此时其他地方也可能在尝试获取锁,这里让他们成功 int interruptMode = 0; while (!isOnSyncQueue(node)) { //如果不在待通知队列中 LockSupport.park(this);//则等待 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //此时node在等待队列中 //下面走获取锁流程获得一个锁,在await之后再unlock掉 //如果acquireQueued结果表示当前线程需要interrupt,值也是REINTERRUPT if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }

interrupt 这里也是有点小绕的,结合下面的方法英文注释,我们可以知道如果在其他线程signal之前就被打断了,则我们最后处理是直接抛出打断异常,否则仅仅是对该线程的打断标志做标注。

/** * Checks for interrupt, returning THROW_IE if interrupted * before signalled, REINTERRUPT if after signalled, or * 0 if not interrupted. */ private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } /** * Transfers node, if necessary, to sync queue after a cancelled wait. * Returns true if thread was cancelled before being signalled. * * @param node the node * @return true if cancelled before the node was signalled */ final boolean transferAfterCancelledWait(Node node) { if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } /* * If we lost out to a signal(), then we can't proceed * until it finishes its enq(). Cancelling during an * incomplete transfer is both rare and transient, so just * spin. */ while (!isOnSyncQueue(node)) Thread.yield(); return false; }

4、流程串讲

以ArrayBlockingQueue为例,第一个线程要从队列中取元素,第二个线程从队列中拿元素,后面两个线程也在争取锁。

notempty在acquireQueued时队列的情况如下:

在notempty.signal的时候,队列里还有其他节点在等待,notempty在acquireQueued时,需要等线程2 unlock才能成功;所以要判断原先的尾结点waitStatus为已取消或者不正确时才进行通知,因为此时中间节点的修复可能需要花一点时间。

关键点总结

如同排它锁的实现,在条件锁的实现中,waitStatus在节点的维护以及是否通知、是否尝试获取锁等的判断中都起到了判断依据的作用。新建条件等待node时候,waitStatus = CONDITION,在通知其获取锁之前将node的 waitStatus修改为0,然后放入到AQS的等待队列中。

 


最新回复(0)