Java多线程——Lock锁机制与AQS同步器流程

it2022-05-05  173

Java多线程——Lock锁机制与AQS同步器

一、Lock锁总结

  既然已经有个synchronized锁机制,为什么还要引入一个Lock锁机制?

  1、Lock在synchronize基础上的改进

   ①synchronized是隐式获取释放锁,但是将锁的获取和释放固化了,虽然简化了同步的管理,但扩展性不好,而lock体系缺拥有了锁获取与释放的可操作性(显式获取释放锁)。

   ②synchronized内建锁有的功能Lock全有。而Lock体系拥有可中断的获取锁、超时获取锁、共享锁等(这些是内建锁不具备的特性)

   ③synchronized获取锁失败就进入阻塞状态(悲观锁机制),而lock获取锁失败就反复CAS自旋(乐观锁机制)。

   ④synchronized可作用于代码块和函数上,而Lock锁只能作用于代码块上。

  Lock锁机制与AQS同步器有什么关系?

  2、Lock接口与AQS的关系

   Lock接口的最主要实现类 ReentrantLock中所有的方法实际上都是调用了其静态内部类Sync中的方法。

   Sync继承了AbstractQueuedSynchronizer(AQS-“同步器”),也就是说,Lock锁的整个体系是基于AQS同步器实现的。

  Lock锁在可中断获取锁、超时获取锁、共享锁是怎么实现的?

  3、lock体系加强synchronized的功能

   ①独占锁特性

    a)获取锁时响应中断:acquireInterruptibly()

     1> 获取锁时响应中断原理与acquire()几乎一样,与之唯一的区别在于当方法parkAndCheckInterrupt()返回true时,表示线程阻塞时被中断,抛出中断异常后线程退出。
     2> 而acquire()中的acquireQueued()中若parkAndCheckInterrupt()返回true,只是将interrupt状态设置为true

    b)超时等待获取锁:tryAcquireNanos()

     该方法在三种情况下会返回
      1> 在超时时间内,当前线程成功获取到锁,返回true
      2> 当前线程在超时时间内被中断,线程抛出异常
      3> 超时时间结束,仍未获取到锁,线程退出返回false
      超时获取锁逻辑与可中断获取锁基本一致,唯一区别在于获取锁失败后,增加了一个时间处理,如果当前时间超过截止时间,线程不再等待,直接退出,返回false。否则将线程阻塞置为等待状态排队获取锁。

   ②独占(互斥)锁与共享锁的特点

    a)共享式访问资源时,其他共享式的访问均被允许,而独占式访问被阻塞。

    b)独占式访问资源时,同一时刻其他所有访问均被阻塞。


  既然AQS同步器是整个Lock体系的绝对核心,那到底什么是AQS同步器呢?

二、AQS同步器简介

  1、AQS(队列)同步器简介

   AQS(队列)同步器是用来构建锁以及其他同步组件的基础框架,它的实现主要是依赖一个int型的状态变量(锁的状态变量)以及通过一个FIFO队列共同构成同步队列。

  Lock锁或者其他同步组件(包括自定义同步组件)是如何实现AQS的?

  2、AQS的模板模式

    AQS使用模板方法模式,将一些与状态相关的核心方法开放给子类重写,而后AQS会使用子类重写的关于状态的方法进行线程的排队、阻塞以及唤醒等操作。

    同步器即支持独占锁,也支持共享锁。

  什么是独占锁?而什么又是共享锁?

   独占锁:同一时刻只能有一个线程获取到该锁,其他想要获取锁的线程只能处于同步队列中等待。(只有当前线程释放了锁,同步队列中的线程才能获取锁)

   共享锁:同一时刻有多个线程获取同步状态。

  AQS提供的模板方法大体分为哪些呢?

    AQS模板分类:独占式获取与释放同步状态、共享式获取与释放同步状态、查询同步队列中的等待线程情况。

  Lock锁到底和AQS同步器是什么关系呢?

  3、Lock锁与AQS同步器的关系

    Lock锁是面向使用者的,隐藏了锁的实现细节。 (使用者直接使用Lock锁进行同步操作即可)

    AQS同步器是面向锁的实现的,简化了锁的实现方式,屏蔽了同步状态管理,线程排队,等待,唤醒等操作。(Lock锁不需要管这些方法底层是怎么实现的,直接用这些方法实现同步语义即可,而在AQS同步器中会从底层将这些方法一一实现)

    任何一个自定义的锁类中一定有一个抽象类继承了AQS类。

  如何实现自定义锁(同步组件)

  4、自定义锁(同步组件):

    ①实现同步组件时推荐定义继承AQS的静态内存类,并重写需要的protected修饰的⽅法;

    ②同步组件语义的实现依赖于AQS的模板⽅法,⽽AQS模板⽅法⼜依赖于被AQS的⼦类所重写的⽅法。

  在自定义锁中只需要关注如何实现同步的语义,而具体同步状态管理、线程排队…等操作,直接交个AQS同步器进行管理,自定义锁时直接调用就好,不需要管实现细节。

    自定义锁的实现案例请参考:自定义锁


  AQS同步器的底层究竟是怎么实现的?

三、AQS同步器的工作流程

  1、AQS抽象类

    ①在同步组件(锁)中,AQS是最核心的部分,同步组件的实现依赖AQS提供的模板方法来实现同步组件语义。

    AQS实现了对同步状态的管理,以及对阻塞线程进行排队、等待通知等等底层实现。

    ②AQS核心组成:同步队列、独占锁的获取与释放、共享锁的获取与释放、可中断锁、超时锁。这一系列功能的实现依赖于AQS提供的模板方法。

  同步队列是什么?

  2、同步队列详解

    AQS内部依赖同步队列来完成同步状态的管理。同步队列:当共享资源被某个线程占有,其他请求该资源的线程会被阻塞,从而进入同步队列。

  同步队列是怎么实现的?

    同步队列是一个带有头尾节点的双向链表(队列),同步队列的节点是AQS内部的一个静态内部类Node,里面存放着waitStatus节点状态、前驱后继节点,以及当前结点包装的线程对象。

    同步队列的头节点是成功获取到同步状态的结点。

    重点说明一下节点状态值所代表的含义:

   int INITIAL = 0;   //初始状态(锁还没有被任何线程获取)    int CANCELLED = 1;    //当前节点从同步队列中取消    int SIGNAL = -1;    //当前节点的后继节点处于阻塞状态(当前节点拿到了同步状态,后继节点还在同步队列中进行等待),如果当前节点释放,同步状态会通知后继节点,使后继节点继续运行。    int CONDITON = -2;   //当前节点处于等待队列中,节点线程等待在Condition上,当其他线程对Condition调用signal()(相当于notify),该节点会从等待队列转移到同步队列中。     int PROPAGATE = -3;    //共享式同步状态会无条件的传播

  同步队列与AQS的关系是什么?

    AQS通过持有同步队列的头尾指针从而管理同步队列。操作包括:获取锁失败的线程进行入队操作,获取锁成功进行出队操作。

   设置尾结点是使用了CAS操作——多个线程都被阻塞,设置头结点不需要CAS操作——只会有一个线程同步成功的<成功的线程作为头结点>

  AQS的工作流程到底是什么?通过独占式同步状态(ReentrantLock锁)的获取与释放深入了解AQS的工作流程

  3、独占式同步状态(锁)的获取——acquire(int arg)

    ①首先,当前线程会通过CAS操作(compareAndSetState()方法)来尝试将同步状态改为1,如果成功则当前线程就获取到同步状态(锁)了,如果失败(获取同步状态失败),则会调用acquire()方法。

  什么时候才会调用acquire()方法

    也就是说,如果当前线程通过CAS成功获取到锁了,就直接退出了,而如果失败了,才会调用acquire()进一步尝试获取同步状态。

final void lock() { // 若获取独占锁成功,则直接返回 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else // 若获取独占锁失败,调用acquire() acquire(1); }

    ②独占锁获取的主要逻辑就在acquire()中:

public final void acquire(int arg) { // 再次尝试获取同步状态,如果成功则⽅法直接返回 // 如果失败则先调⽤addWaiter()⽅法再调⽤acquireQueued()⽅法 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

  再次尝试获取同步状态

    a)调用自定义同步器实现的tryAcquire(int arg)方法,再次尝试获取锁的同步状态,该方法保证线程安全的获取同步状态。(同一时刻只能有一个线程成功获取同步状态)

     1> 如果同步状态获取成功,返回true,acquire()直接退出。
     2> 如果同步状态获取失败,返回false,则构造同步节点(独占式Node.EXCLUSIVE)
private Node addWaiter(Node mode) { // 将当前线程包装称为Node类型 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 当前尾节点不为空(当前队列不为空) if (pred != null) { // 将当前线程以尾插的⽅式插⼊同步队列中 // 需要采用CAS操作保证线程安全 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 当前尾结点为空(当前队列为空)或CAS尾插失败 enq(node); return node; }

  获取同步状态失败,同步队列入队操作

    b)接着调用addWaiter(Node node)方法,先将a)构造的同步节点尾插到同步队列中。

     1> 如果当前同步队列的尾结点不为null,则直接采用compareAndSetTail()方法将该节点尾插入同步队列。
     2> 如果当前当前同步队列的尾结点为null(说明当前队列为null),或将该节点CAS尾插入同步队列失败,则会调用enq(Node node)方法
// 能调用到enq()方法有两种可能 // 1、队列为空(enq()创建队列) // 2、CAS尾插失败(enq()反复尾插) private Node enq(final Node node) { for (;;) { Node t = tail; // 若队列为空(尾结点为空),则创建队列 if (t == null) { // Must initialize // 初始化头结点 if (compareAndSetHead(new Node())) tail = head; } else { // 若队列不为空,说明是CAS尾插失败 // 则反复重试,直到尾插成功 // 成功后通过return跳出该“死循环” node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

  同步队列为空或CAS尾插失败

    b+)调用enq(Node node)方法,直到将当前节点插入到同步队列成功为止。

     1> 若当前队列尾结点为null,初始化当前队列,调用compareAndSetHead(new Node()⽅法,完成链式队列的头结点的初始化。
     2>若CAS尾插失败,则反复自旋重试,直到成功为止。
/** 1、如果当前节点的前驱节点是头节点,并且能够获得同步状态的话 ,当前线程能够获得锁该⽅法执⾏结束退出; 2、 获取锁失败的话,先将节点状态设置成SIGNAL, 然后调⽤LookSupport.park⽅法使得当前线程阻塞。 */ final boolean acquireQueued(final Node node, int arg){ boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取当前节点的前驱节点 final Node p = node.predecessor(); // 当前节点获取独占式锁的条件: // 当前节点的前驱节点是头结点(p==head) // 当前节点获取到了同步状态 if (p == head && tryAcquire(arg)) { // 若当前节点获取到了同步状态。 // 当前节点作为同步队列头结点 setHead(node); // 释放前驱节点 p.next = null; // help GC failed = false; return interrupted; } // 获取同步状态失败 // 线程进⼊等待状态等待获取独占锁 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

  入队成功,排队获取同步状态(锁)

    c)调用acquireQueued(Node node,int arg)方法获取锁的状态

     acquireQueued()获取锁成功的条件:当前节点前驱为头结点,并且当前节点成功获得同步状态(出队的条件)
     acquireQueued()方法整体上是在自旋,当前驱节点不是头结点或前驱节点是头结点但当前节点未获取到同步状态,此时当前线程进入等待(阻塞)状态了,则反复自旋尝试获取同步状态。
     1>若当前节点获取锁成功,就调用setHead(node);设置为队列的头结点,再将之前的头结点与同步队列断开(方便GC回收)

  排队获取锁失败(前驱节点不是头节点||当前节点没获取到同步状态)

     2> 若获取锁失败
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
      首先,调用shouldParkAfterFailedAcquire(Node prev,Node node),使用CAS将前驱节点状态置为SIGNAL(表示需要将当前节点阻塞),若compareAndSetWaitStatus失败,不断自旋直到前驱节点状态置为SIGNAL为止。(acquireQueued中死循环)
      只有将前驱节点的状态设置为SINGAL,当前驱节点执行完就会通知当前节点。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
      然后,将前驱节点状态置为SIGNAL后,调用parkAndCheckInterrupt(),该方法中会调用LockSupport.park(this);将当前线程阻塞。(当前线程安安静静的在同步队列中等待着被通知)

    实际上,acquireQueued()在⾃旋过程中主要完成了两件事情:

     1>如果当前节点的前驱节点是头节点,并且能够获得同步状态的话,当前线程能够获得锁该⽅法执⾏结束退出。
     2>获取锁失败的话,先将节点状态设置成SIGNAL,然后调⽤LookSupport.park⽅法使得当前线程阻塞。

    ③独占锁获取完整流程图

  4、独占锁的释放——release()

   ①unlock()方法实际是在调用AQS提供的release()模板方法

public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }

    release()方法是unlock()方法的具体实现。

protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }

  尝试释放同步状态

    a)如果同步状态释放成功,则首先获取头结点,当头结点不为null且头结点不是初始状态时,会调用unparkSuccessor()方法。

private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }

  释放状态成功,唤醒后继节点(所包装的线程)

    b)在unparkSuccessor()方法中,⾸先获取头节点的后继节点,当后继节点不为null的时候会调用LockSupport.unpark()方法唤醒后继节点包装的线程。

   ②每一次锁释放后就会将当前头结点(原本获取到锁的结点)移出同步队列,并唤醒队列中该节点的后继节点所包装的线程。

  5、独占式锁获取与释放总结

   ①线程获取锁失败,将线程通过调用addWaiter()封装成Node节点进行入队操作。addWaiter()中方法enq()完成对同步队列的头结点初始化以及CAS尾插失败后的重试处理。

   ②入队之后排队获取锁的核心方法acquireQueued(),节点排队获取锁是一个自旋过程。节点获取锁的前提条件,当且仅当,当前节点的前驱节点为头结点并且该节点成功获取同步状态(tryAcquire返回true)时。

    a)若前驱节点出队,该节点作为头结点并且该节点引用的线程获取到锁。

    b)否则不满足条件时,会不断自旋将前驱节点的状态置为SIGNAL(当前节点需要阻塞),而后调用LockSupport.park()将当前线程阻塞。

   ③释放锁时会将当前头结点(原本获取到锁的结点)移出同步队列,并唤醒队列中该节点的后继节点所包装的线程(若后继节点不为null)

  Lock体系中可中断获取锁、超时等待获取锁是怎么实现的

  6、Lock自带的可中断获取锁、超时等待获取锁

   ①可中断获取锁:调用lockInterruptibly(底层是AQS的acquireInterruptibly),其他逻辑与acquire()一模一样,唯一区别就是当parkAndCheckInterrupt()返回true时即线程阻塞时该线程被中断,代码抛出被中断异常。(acquire()是只设置中断标志)

   ②超时等待获取锁:超时等待获取锁的概念及返回的时机就不赘述,请参考:超时等待获取锁。调用tryLock(底层是AQS的doAcquireNanos)在可中断获取锁的基础上增加了时间的设置,死循环中获取锁失败后,如果达到设置的超时时间,则直接返回false,否则,才会进一步执行线程阻塞等待、响应中断异常等代码。


最新回复(0)