我们知道juc(java.util.concurrent)包下有很多实用的类,提供了很多并发工具,例如线程池、原子类、并发工具、信号量工具、锁等,可以说基本实现都为悲观锁,底层原理基本都使用了AQS(AbstractQueuedSynchronizer),AQS不是一种概念,是并发中实打实的工具类。本篇文章针对AQS做解析。
AQS
AQS是多线程访问共享资源的同步器框架。AQS的资源可以是独占的也可以是共享的。我们先来简单看一下它的使用方式和ApI(因为是抽象类,是不能直接使用的),下图是AQS的整体脉络。
AQS核心就是一个状态值state,同时维护了一个线程的阻塞队列,队列的节点为有两种状态:SHARED(共享)和EXCLUSIVE(独占),节点状态有五种:
CANCELLED(1):取消状态,当线程不再希望获取锁时,设置为取消状态
SIGNAL(-1):当前节点的后继者处于等待状态,当前节点的线程如果释放或取消了同步状态,通知后继节点
CONDITION(-2):等待队列的等待状态,当调用signal()时,进入同步队列
PROPAGATE(-3):共享模式,同步状态的获取的可传播状态
0:初始状态
直接理解AQS其实比较困难,我们可以先通过ReentrantLock的实现来理解。
从ReentrantLock理解AQS
ReentrantLock大家都不陌生,最常见的API就是lock unlock,它其实就是基于AQS实现的,Lock中有一个子类Sync继承了AQS,又有Sync的子类FairSync和NonFairSync对应着公平所和非
在创建对象时初始化了相应的对象:
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
lock()
然后让我们看lock的实现:
public void lock() { sync.lock(); } //fair final void lock() { acquire(1); } //NonFail final void lock() { //CAS设置值,如果成功表示当前无锁直接上锁并设置独占线程 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
当非
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
内部依次执行了
tryAcquire
acquireQueued(addWaiter(Node.EXCLUSIVE),1);
selfInterrupt()
第一步:tryAcquire可以的话直接结束流程
//nonFair tryAcquire final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { //暂存线程用于重入 setExclusiveOwnerThread(current); return true; } } //可重入的体现 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } // tryAcquire protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //hasQueuedPredecessors 查询是否有线程等待时间更久 用于公平锁 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { //暂存线程用于重入 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
第二步:当获取不到锁或是可重入锁时,就会继续执行addWaiter():
//传入的是锁的状态,共享还是独占,根据此创建队列节点并放于队列尾 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //走到这里代表没有成功入队(空队伍或是乐观锁入队冲突) 进行自旋操作 enq(node); return node; } //自旋操作 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
第三步:经过第二步,当前线程已经入队列,对入队列的节点执行acquireQueued,主要是等待获取锁的机会:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; //自旋,要么获取锁要么中断 for (;;) { //p是node前一个节点 final Node p = node.predecessor(); //如果node前一个节点是p,则轮询等待 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //获取锁失败/p不是头节点 park语义为等待 //前驱节点状态位SIGNAL则等待,防止无限循环浪费资源 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) //设置节点状态为CLOSE 1 cancelAcquire(node); } } //检查前驱节点是否是信号就绪状态 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //前驱节点为信号待命状态(就绪通知后续节点,同时当前节点线程可以wait),则可返回true if (ws == Node.SIGNAL) return true; //前驱节点为取消状态,移除 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; //CAS标识前驱为信号待命 } else {sad compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } //先标记为false,下次轮询到自然检查 return false; } //阻塞并返回线程的中断状态 private final boolean parkAndCheckInterrupt() { //进入waiting状态 LockSupport.park(this); return Thread.interrupted(); }
可以看出整体的流程如下:
快速获取锁,当前没有线程执行的时候直接获取锁
尝试获取锁,当没有线程执行或是当前线程(因为可重入)占用锁,可以直接获取锁
将当前线程包装为Node对象放入同步队列,设置为尾节点
前一个节点如果为头节点,再次尝试获取一次锁
将前一个有效节点设置为SIGNAL并不断轮询检查
阻塞直到被唤醒
判断前驱节点的状态流程为:
唤醒是在unlock()方法中。