首先介绍一下什么是条件锁。以ArrayBlockingQueue为例子,一个线程要从队列中取元素,另一个线程要从队列中放元素,前者需要队列非空,而后者需要队列非满,这里的非空和非满就是两个条件,而且条件和动作之间需要有对对象锁的持有(否则条件改变,比如非空条件不满足了,此时再去取元素就会出错)。所以条件锁的含义是:某一个动作的完成需要满足某个条件,这个条件依赖于一个对象(锁主体,比如这里的队列)。 下面通过实际场景导出条件锁的实现方法。
从上图我们可以看出条件锁需要一个等待方法和一个通知方法,还有一种是通知所有等待节点的方法,适用于线程2放入多个元素,通知多个等待取出元素的线程的情况。在这里,非空、非满的条件都依赖于队列,所以锁也是基于队列这个对象的锁。非空和非满都可能会有多个等待线程,所以需要维护自己的等待队列。
针对条件锁的情况,AbstractQueuedSynchronizer中的类ConditionObject实现了Condition接口,ConditionObject中主要属性有firstWaiter和lastWaiter,用来做等待的节点列表。主要方法有await和signal、signalAll。
将waiter节点指针后移,并将该节点加入到等待队列中,在没有其他条件等待中的节点(如果有,则让该节点先被其他线程unpack)的情况下,unpack该节点的线程
/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signal() { //在signal之前要先通过lock获取锁,所以会持有该锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } /** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ //将waiter节点指针后移,并将该节点加入到等待队列中,并unpack该节点的线程 private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } /** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) */ final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ //如果该节点状态为已取消 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node);//将waiter,即condition等待队列中的对象插入到待唤醒队列的末尾,并返回原先的尾节点 int ws = p.waitStatus; //如果该节点已经取消竞争,或者未被成功被设置成被待通知状态(则表示该节点的waitStatus不正确),都unpack 当前节点的线程 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }这里最后的unpark的判断条件可能会有疑问,这个我们在讲完所有代码后结合流程图进行讲解。
interrupt 这里也是有点小绕的,结合下面的方法英文注释,我们可以知道如果在其他线程signal之前就被打断了,则我们最后处理是直接抛出打断异常,否则仅仅是对该线程的打断标志做标注。
/** * Checks for interrupt, returning THROW_IE if interrupted * before signalled, REINTERRUPT if after signalled, or * 0 if not interrupted. */ private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } /** * Transfers node, if necessary, to sync queue after a cancelled wait. * Returns true if thread was cancelled before being signalled. * * @param node the node * @return true if cancelled before the node was signalled */ final boolean transferAfterCancelledWait(Node node) { if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } /* * If we lost out to a signal(), then we can't proceed * until it finishes its enq(). Cancelling during an * incomplete transfer is both rare and transient, so just * spin. */ while (!isOnSyncQueue(node)) Thread.yield(); return false; }以ArrayBlockingQueue为例,第一个线程要从队列中取元素,第二个线程从队列中拿元素,后面两个线程也在争取锁。
notempty在acquireQueued时队列的情况如下:
在notempty.signal的时候,队列里还有其他节点在等待,notempty在acquireQueued时,需要等线程2 unlock才能成功;所以要判断原先的尾结点waitStatus为已取消或者不正确时才进行通知,因为此时中间节点的修复可能需要花一点时间。
如同排它锁的实现,在条件锁的实现中,waitStatus在节点的维护以及是否通知、是否尝试获取锁等的判断中都起到了判断依据的作用。新建条件等待node时候,waitStatus = CONDITION,在通知其获取锁之前将node的 waitStatus修改为0,然后放入到AQS的等待队列中。