本文的代码部分理解,建议结合视频:https://edu.csdn.net/course/play/25414/301467
首先分析读写锁的需求: 1、读锁状态下,可以继续加读锁,但是不能加写锁;如果有写锁在等待队列,后续请求的读锁也需要加到等待队列中。 2、写锁状态下,不能加其他锁。 3、读锁获得锁之后,需要通知后继节点中(在第一个写锁之前)的读锁。
如何实现呢? 1、第一、二点要在获取锁(tryAcquire)中控制; 2、第三点需要在读锁获得锁之后进行 3、写锁的获取、释放跟普通锁的获取、释放相同;读锁的获取、释放有自己的不同之处,所以实现 acquireShared,releaseShared方法
读锁加入等待队列
/** * Acquires in shared uninterruptible mode. * @param arg the acquire argument */ private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) {//如果是头节点的下一个节点,可能在这个过程中,前面的x锁或读锁释放 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r);//设置新head,并且传递释放后续读锁 p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } /** * Sets head of queue, and checks if successor may be waiting * in shared mode, if so propagating if either propagate > 0 or * PROPAGATE status was set. * * @param node the node * @param propagate the return value from a tryAcquireShared */ //设置新的头节点,并检查当前节点后是否又有新的读节点加入 private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ //如果后面有等待的其他节点(propagate>0) // 或者 原来的头节点或新的头节点(即node)为待通知状态 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared())//如果当前节点后面的节点为共享锁节点 doReleaseShared();//释放后继共享锁的关键方法 } }释放后继读锁的关键方法
/** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */ //通知后继节点并保证传播 private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. * This proceeds in the usual way of trying to unparkSuccessor of head if it needs * signal. * But if it does not, status is set to PROPAGATE to ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added while we are doing this. * Also, unlike other uses of unparkSuccessor, we need to know if CAS to reset status fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) {//表示后面有等待节点 int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//通过cas将节点等待状态从待通知变为已获得锁,如果失败,表示有其他线程在进行了 continue; // loop to recheck cases unparkSuccessor(h);//unpack后续节点 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //如果有新的进程修改了head节点,会出现waitStatus为0,要将其改为PROPAGATE,来保证传播行为继续 continue; // loop on failed CAS } if (h == head) // loop if head changed break; //如果有新的进程修改了head节点,循环继续 } }这个部分是读写锁最难理解的部分,下面进行解释(此处感谢参考文档:https://segmentfault.com/a/1190000016447307): 假设此时写锁释放,一个读节点(称为node0)刚获得锁,等待队列(所有节点都是读节点)如下图所示:
node0动作: node0 unpark了node1,此时如果 h == head,则表示node1尚未获取锁,并将head改为自己; 如果 h != head,表示node1已经获得锁,并将head改为自己,此时继续循环,可以不等到node1来释放node2,直接自己去unpark node2。 此时队列如下图:
因为PROPAGATE的作用在于保证传播,而实际过程如下(此处propagate=tryAcquireShared方法将返回队列中等待节点的个数):
1) node0动作: 此时node2尚未设置head,head仍为node1,所以h==head,循环结束。
node1动作: 在setHeadAndPropagate时发现propagate>0,继续释放后续读锁,若此时node2将head更新为自己,队列如下图所示: ,node1将node2的waitStatus改为0,unpark node3,队列如下图所示:
此时node3尚未设置head,循环结束。
2)
node2动作: 在setHeadAndPropagate时发现propagate>0,继续释放后续读锁。若此时node3尚未设置head,不执行else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)),循环结束。
node3动作: 设置了新head,并在setHeadAndPropagate时发现propagate>0,继续释放后续读锁。此时队列如下图:
更新 node3的waitStatus为0,且unpark node4。
由以上描述可知释放后续读锁的动作没有中断,并且只有在当前head节点状态为SIGNAL的情况下才会unpark后继节点,将head设置成PROPAGATE并不会因此提早unpark后继节点,所以对释放速度也没有优化。 对node的waitStatus从PROPAGATE修改为SIGNAL只有当队列只有当前一个节点(没有等待节点),且新增申请读锁的情况,此时队列没有被写锁占有,且等待队列为空,所以直接就申请锁成功了。
jdk实现的ReentrantReadWriteLock比较复杂,现结合我自己实现的简单的MyReadWriteLock进行讲解:
首先,需要定义一个类MyReadWriteLock,它里面有ReadLock,WriteLock实例,它们的lock、unlock方法,分别通过AbstractQueuedSynchronizer实现子类的tryAcquireShared,tryReleaseShared,tryAcquire,tryRelease实现。 这几个方法分别怎么实现呢?返回去看需求:1、读锁状态下,可以继续加读锁,但是不能加写锁;但是如果有写锁在等待队列,后续的读锁也需要加到队列中。
tryAcquireShared方法
private final static int WRITE_MASK=1024; @Override protected int tryAcquireShared(int arg) { while(!isHeldByWriteLock()){//如果现在不是写锁状态 if(!getExclusiveQueuedThreads().isEmpty()){//如果有等待的写锁 return -1; } if(compareAndSetState(getState(),getState()+1)){//如果争抢成功 //apparentlyFirstQueuedIsExclusive //如果第一个等待的线程为写锁 if(writeLockAtTopOfWaitingQueue()){ return 0; } return getSharedQueuedThreads().size(); } } return -1; } private boolean writeLockAtTopOfWaitingQueue() { return !getExclusiveQueuedThreads().isEmpty() && getFirstQueuedThread()==getExclusiveQueuedThreads().iterator().next(); } protected boolean isHeldByWriteLock() { return getState()/WRITE_MASK>0; }其中isHeldByWriteLock是通过state的值来判断是否被写锁占有,这就涉及到tryAcquire的实现:
protected boolean tryAcquire(int arg) { //因为加写锁的时候,一定没有读锁占有 while(getState()==0){ if(compareAndSetState(0,WRITE_MASK)){ return true; } } return false; }最后的两个release方法就比较简单了:
@Override protected boolean tryRelease(int arg) { if(getExclusiveOwnerThread()==Thread.currentThread()){ compareAndSetState(WRITE_MASK,0); return true; } return false; } @Override protected boolean tryReleaseShared(int arg) { if(如果读线程列表中含有当前线程){ compareAndSetState(getState(),getState()-1); return true; } return false; }在这个基础上加上对锁重入的控制,并且将state的运算优化为位运算,就是java的ReentrantReadWriteLock实现。
本篇文章以abstractQueuedSynchronizer中的方法为基础分析读写锁的需求、实现方法;如果要深入理解相关代码的设计思路,需要结合ReadWriteLock的实际实现类,本文举了一个简单的例子,实际真正使用时需要使用jdk的实现:ReentrantReadWriteLock,以下是一篇介绍:https://blog.csdn.net/LiuRenyou/article/details/98312234
在分析、回放aqs相关执行流程的时候,用脑子有时候难以胜任,需要借助工具,比如泳道图、intellij 的调试功能;在此进行举例:
比如读写锁这一块,笔者怀疑PROPAGATE这个waitStatus值存在的必要性,便将doReleaseShared方法中那一句修改waitStatus的代码注释掉,然后进行调试。调试代码如下:
public static void main(String[] args) { ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock(); ReentrantReadWriteLock.ReadLock readLock=readWriteLock.readLock(); ReentrantReadWriteLock.WriteLock writeLock=readWriteLock.writeLock(); new Thread(new Runnable() { @Override public void run() { writeLock.lock(); writeLock.unlock(); } }).start(); new Thread(new Runnable() { @Override public void run() { readLock.lock(); // latch.countDown(); } }).start(); new Thread(new Runnable() { @Override public void run() { readLock.lock(); // latch.countDown(); } }).start(); new Thread(new Runnable() { @Override public void run() { readLock.lock(); System.out.println("haha"); } }).start(); }但是这里有一个问题:运行时使用的AbstractQueuedSynchronizer是jdk自带的实现,即使你在本intellij idea的模块src目录下作了覆盖,这涉及到类加载机制。一个解决办法是通过endorsed方式:
首先将AbstractQueuedSynchronizer打包成一个jar包,即在cd到src/main/java目录后:
javac java/util/concurrent/locks/AbstractQueuedSynchronizer.java jar -cvf myaqs.jar java/util/concurrent/locks/AbstractQueuedSynchronizer*.class然后在java的debug configurations的vm arguments处设置:
-Djava.endorsed.dirs=/Users/zhangyugu/IdeaProjects/practice-Code/testCode/src/main/java/
如此运行时使用的便是我们覆盖的AbstractQueuedSynchronizer类。