AbstractQueuedSynchronizer应用之读写锁

it2022-05-05  229

本文的代码部分理解,建议结合视频:https://edu.csdn.net/course/play/25414/301467

思路分析

首先分析读写锁的需求: 1、读锁状态下,可以继续加读锁,但是不能加写锁;如果有写锁在等待队列,后续请求的读锁也需要加到等待队列中。 2、写锁状态下,不能加其他锁。 3、读锁获得锁之后,需要通知后继节点中(在第一个写锁之前)的读锁。

如何实现呢? 1、第一、二点要在获取锁(tryAcquire)中控制; 2、第三点需要在读锁获得锁之后进行 3、写锁的获取、释放跟普通锁的获取、释放相同;读锁的获取、释放有自己的不同之处,所以实现 acquireShared,releaseShared方法

代码讲解

1、acquireShared

 /**      * Acquires in shared mode, ignoring interrupts.  Implemented by      * first invoking at least once {@link #tryAcquireShared},      * returning on success.  Otherwise the thread is queued, possibly      * repeatedly blocking and unblocking, invoking {@link      * #tryAcquireShared} until success.      *      * @param arg the acquire argument.  This value is conveyed to      *        {@link #tryAcquireShared} but is otherwise uninterpreted      *        and can represent anything you like.      */     public final void acquireShared(int arg) {         //用返回值表示是否成功获取锁,<0表示失败;>=0 表示成功,其中 =0表示没有需要释放的后继读锁 >0表示有需要释放的后继读锁         if (tryAcquireShared(arg) < 0)             doAcquireShared(arg);     }

  读锁加入等待队列

/**      * Acquires in shared uninterruptible mode.      * @param arg the acquire argument      */     private void doAcquireShared(int arg) {         final Node node = addWaiter(Node.SHARED);         boolean failed = true;         try {             boolean interrupted = false;             for (;;) {                 final Node p = node.predecessor();                 if (p == head) {//如果是头节点的下一个节点,可能在这个过程中,前面的x锁或读锁释放                     int r = tryAcquireShared(arg);                     if (r >= 0) {                         setHeadAndPropagate(node, r);//设置新head,并且传递释放后续读锁                         p.next = null; // help GC                         if (interrupted)                             selfInterrupt();                         failed = false;                         return;                     }                 }                 if (shouldParkAfterFailedAcquire(p, node) &&                         parkAndCheckInterrupt())                     interrupted = true;             }         } finally {             if (failed)                 cancelAcquire(node);         }     }  /**      * Sets head of queue, and checks if successor may be waiting      * in shared mode, if so propagating if either propagate > 0 or      * PROPAGATE status was set.      *      * @param node the node      * @param propagate the return value from a tryAcquireShared      */     //设置新的头节点,并检查当前节点后是否又有新的读节点加入     private void setHeadAndPropagate(Node node, int propagate) {         Node h = head; // Record old head for check below         setHead(node);         /*          * Try to signal next queued node if:          *   Propagation was indicated by caller,          *     or was recorded (as h.waitStatus either before          *     or after setHead) by a previous operation          *     (note: this uses sign-check of waitStatus because          *      PROPAGATE status may transition to SIGNAL.)          * and          *   The next node is waiting in shared mode,          *     or we don't know, because it appears null          *          * The conservatism in both of these checks may cause          * unnecessary wake-ups, but only when there are multiple          * racing acquires/releases, so most need signals now or soon          * anyway.          */         //如果后面有等待的其他节点(propagate>0)         // 或者 原来的头节点或新的头节点(即node)为待通知状态         if (propagate > 0 || h == null || h.waitStatus < 0 ||                 (h = head) == null || h.waitStatus < 0) {             Node s = node.next;             if (s == null || s.isShared())//如果当前节点后面的节点为共享锁节点                 doReleaseShared();//释放后继共享锁的关键方法         }     }

2、releaseShared

 /**      * Releases in shared mode.  Implemented by unblocking one or more      * threads if {@link #tryReleaseShared} returns true.      *      * @param arg the release argument.  This value is conveyed to      *        {@link #tryReleaseShared} but is otherwise uninterpreted      *        and can represent anything you like.      * @return the value returned from {@link #tryReleaseShared}      */     public final boolean releaseShared(int arg) {         if (tryReleaseShared(arg)) {//读锁的获取不需要其他读锁的释放来进行通知,但是写锁的获取需要             doReleaseShared();             return true;         }         return false;     }

释放后继读锁的关键方法

    /**      * Release action for shared mode -- signals successor and ensures      * propagation. (Note: For exclusive mode, release just amounts      * to calling unparkSuccessor of head if it needs signal.)      */     //通知后继节点并保证传播     private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. * This proceeds in the usual way of trying to unparkSuccessor of head if it needs * signal. * But if it does not, status is set to PROPAGATE to ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added while we are doing this. * Also, unlike other uses of unparkSuccessor, we need to know if CAS to reset status fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) {//表示后面有等待节点 int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//通过cas将节点等待状态从待通知变为已获得锁,如果失败,表示有其他线程在进行了 continue; // loop to recheck cases unparkSuccessor(h);//unpack后续节点 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //如果有新的进程修改了head节点,会出现waitStatus为0,要将其改为PROPAGATE,来保证传播行为继续 continue; // loop on failed CAS } if (h == head) // loop if head changed break; //如果有新的进程修改了head节点,循环继续 } }

这个部分是读写锁最难理解的部分,下面进行解释(此处感谢参考文档:https://segmentfault.com/a/1190000016447307): 假设此时写锁释放,一个读节点(称为node0)刚获得锁,等待队列(所有节点都是读节点)如下图所示:

h == head 的作用:

node0动作: node0 unpark了node1,此时如果 h == head,则表示node1尚未获取锁,并将head改为自己; 如果 h != head,表示node1已经获得锁,并将head改为自己,此时继续循环,可以不等到node1来释放node2,直接自己去unpark node2。 此时队列如下图:

else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))没有作用:

因为PROPAGATE的作用在于保证传播,而实际过程如下(此处propagate=tryAcquireShared方法将返回队列中等待节点的个数):

1) node0动作: 此时node2尚未设置head,head仍为node1,所以h==head,循环结束。

node1动作:  在setHeadAndPropagate时发现propagate>0,继续释放后续读锁,若此时node2将head更新为自己,队列如下图所示: ,node1将node2的waitStatus改为0,unpark node3,队列如下图所示:

此时node3尚未设置head,循环结束。

2)

node2动作:  在setHeadAndPropagate时发现propagate>0,继续释放后续读锁。若此时node3尚未设置head,不执行else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)),循环结束。

node3动作: 设置了新head,并在setHeadAndPropagate时发现propagate>0,继续释放后续读锁。此时队列如下图:

更新 node3的waitStatus为0,且unpark node4。

由以上描述可知释放后续读锁的动作没有中断,并且只有在当前head节点状态为SIGNAL的情况下才会unpark后继节点,将head设置成PROPAGATE并不会因此提早unpark后继节点,所以对释放速度也没有优化。 对node的waitStatus从PROPAGATE修改为SIGNAL只有当队列只有当前一个节点(没有等待节点),且新增申请读锁的情况,此时队列没有被写锁占有,且等待队列为空,所以直接就申请锁成功了。

ReadWriteLock实现

jdk实现的ReentrantReadWriteLock比较复杂,现结合我自己实现的简单的MyReadWriteLock进行讲解:

首先,需要定义一个类MyReadWriteLock,它里面有ReadLock,WriteLock实例,它们的lock、unlock方法,分别通过AbstractQueuedSynchronizer实现子类的tryAcquireShared,tryReleaseShared,tryAcquire,tryRelease实现。 这几个方法分别怎么实现呢?返回去看需求:1、读锁状态下,可以继续加读锁,但是不能加写锁;但是如果有写锁在等待队列,后续的读锁也需要加到队列中。

tryAcquireShared方法

private final static int WRITE_MASK=1024; @Override protected int tryAcquireShared(int arg) { while(!isHeldByWriteLock()){//如果现在不是写锁状态 if(!getExclusiveQueuedThreads().isEmpty()){//如果有等待的写锁 return -1; } if(compareAndSetState(getState(),getState()+1)){//如果争抢成功 //apparentlyFirstQueuedIsExclusive //如果第一个等待的线程为写锁 if(writeLockAtTopOfWaitingQueue()){ return 0; } return getSharedQueuedThreads().size(); } } return -1; } private boolean writeLockAtTopOfWaitingQueue() { return !getExclusiveQueuedThreads().isEmpty() && getFirstQueuedThread()==getExclusiveQueuedThreads().iterator().next(); } protected boolean isHeldByWriteLock() { return getState()/WRITE_MASK>0; }

其中isHeldByWriteLock是通过state的值来判断是否被写锁占有,这就涉及到tryAcquire的实现:

protected boolean tryAcquire(int arg) { //因为加写锁的时候,一定没有读锁占有 while(getState()==0){ if(compareAndSetState(0,WRITE_MASK)){ return true; } } return false; }

最后的两个release方法就比较简单了:

@Override protected boolean tryRelease(int arg) { if(getExclusiveOwnerThread()==Thread.currentThread()){ compareAndSetState(WRITE_MASK,0); return true; } return false; } @Override protected boolean tryReleaseShared(int arg) { if(如果读线程列表中含有当前线程){ compareAndSetState(getState(),getState()-1); return true; } return false; }

在这个基础上加上对锁重入的控制,并且将state的运算优化为位运算,就是java的ReentrantReadWriteLock实现。

本篇文章以abstractQueuedSynchronizer中的方法为基础分析读写锁的需求、实现方法;如果要深入理解相关代码的设计思路,需要结合ReadWriteLock的实际实现类,本文举了一个简单的例子,实际真正使用时需要使用jdk的实现:ReentrantReadWriteLock,以下是一篇介绍:https://blog.csdn.net/LiuRenyou/article/details/98312234

对于jdk系统类的调试方法

在分析、回放aqs相关执行流程的时候,用脑子有时候难以胜任,需要借助工具,比如泳道图、intellij 的调试功能;在此进行举例:

比如读写锁这一块,笔者怀疑PROPAGATE这个waitStatus值存在的必要性,便将doReleaseShared方法中那一句修改waitStatus的代码注释掉,然后进行调试。调试代码如下:

public static void main(String[] args) { ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock(); ReentrantReadWriteLock.ReadLock readLock=readWriteLock.readLock(); ReentrantReadWriteLock.WriteLock writeLock=readWriteLock.writeLock(); new Thread(new Runnable() { @Override public void run() { writeLock.lock(); writeLock.unlock(); } }).start(); new Thread(new Runnable() { @Override public void run() { readLock.lock(); // latch.countDown(); } }).start(); new Thread(new Runnable() { @Override public void run() { readLock.lock(); // latch.countDown(); } }).start(); new Thread(new Runnable() { @Override public void run() { readLock.lock(); System.out.println("haha"); } }).start(); }

但是这里有一个问题:运行时使用的AbstractQueuedSynchronizer是jdk自带的实现,即使你在本intellij idea的模块src目录下作了覆盖,这涉及到类加载机制。一个解决办法是通过endorsed方式:

首先将AbstractQueuedSynchronizer打包成一个jar包,即在cd到src/main/java目录后: 

javac java/util/concurrent/locks/AbstractQueuedSynchronizer.java jar -cvf myaqs.jar java/util/concurrent/locks/AbstractQueuedSynchronizer*.class

然后在java的debug configurations的vm arguments处设置:

-Djava.endorsed.dirs=/Users/zhangyugu/IdeaProjects/practice-Code/testCode/src/main/java/

如此运行时使用的便是我们覆盖的AbstractQueuedSynchronizer类。

 


最新回复(0)