我们知道juc(java.util.concurrent)包下有很多实用的类,提供了很多并发工具,例如线程池、原子类、并发工具、信号量工具、锁等,可以说基本实现都为悲观锁,底层原理基本都使用了AQS(AbstractQueuedSynchronizer),AQS不是一种概念,是并发中实打实的工具类。本篇文章针对AQS做解析。
AQS
AQS是多线程访问共享资源的同步器框架。AQS的资源可以是独占的也可以是共享的。我们先来简单看一下它的使用方式和ApI(因为是抽象类,是不能直接使用的),下图是AQS的整体脉络。
AQS核心就是一个状态值state,同时维护了一个线程的阻塞队列,队列的节点为有两种状态:SHARED(共享)和EXCLUSIVE(独占),节点状态有五种:
CANCELLED(1):取消状态,当线程不再希望获取锁时,设置为取消状态
SIGNAL(-1):当前节点的后继者处于等待状态,当前节点的线程如果释放或取消了同步状态,通知后继节点
CONDITION(-2):等待队列的等待状态,当调用signal()时,进入同步队列
PROPAGATE(-3):共享模式,同步状态的获取的可传播状态
0:初始状态
直接理解AQS其实比较困难,我们可以先通过ReentrantLock的实现来理解。
从ReentrantLock理解AQS
ReentrantLock大家都不陌生,最常见的API就是lock unlock,它其实就是基于AQS实现的,Lock中有一个子类Sync继承了AQS,又有Sync的子类FairSync和NonFairSync对应着公平所和非

在创建对象时初始化了相应的对象:
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
lock()
然后让我们看lock的实现:
public void lock() {
sync.lock();
}
//fair
final void lock() {
acquire(1);
}
//NonFail
final void lock() {
//CAS设置值,如果成功表示当前无锁直接上锁并设置独占线程
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
当非
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
内部依次执行了
tryAcquire
acquireQueued(addWaiter(Node.EXCLUSIVE),1);
selfInterrupt()
第一步:tryAcquire可以的话直接结束流程
//nonFair tryAcquire
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
//暂存线程用于重入
setExclusiveOwnerThread(current);
return true;
}
}
//可重入的体现
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// tryAcquire
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//hasQueuedPredecessors 查询是否有线程等待时间更久 用于 公平锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
//暂存线程用于重入
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
第二步:当获取不到锁或是可重入锁时,就会继续执行addWaiter():
//传入的是锁的状态,共享还是独占,根据此创建队列节点并放于队列尾
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//走到这里代表没有成功入队(空队伍或是乐观锁入队冲突) 进行自旋操作
enq(node);
return node;
}
//自旋操作
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
第三步:经过第二步,当前线程已经入队列,对入队列的节点执行acquireQueued,主要是等待获取锁的机会:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//自旋,要么获取锁要么中断
for (;;) {
//p是node前一个节点
final Node p = node.predecessor();
//如果node前一个节点是p,则轮询等待
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//获取锁失败/p不是头节点 park语义为等待
//前驱节点状态位SIGNAL则等待,防止无限循环浪费资源
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
//设置节点状态为CLOSE 1
cancelAcquire(node);
}
}
//检查前驱节点是否是信号就绪状态
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//前驱节点为信号待命状态(就绪通知后续节点,同时当前节点线程可以wait),则可返回true
if (ws == Node.SIGNAL)
return true;
//前驱节点为取消状态,移除
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
//CAS标识前驱为信号待命
} else {sad
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
//先标记为false,下次轮询到自然检查
return false;
}
//阻塞并返回线程的中断状态
private final boolean parkAndCheckInterrupt() {
//进入waiting状态
LockSupport.park(this);
return Thread.interrupted();
}
可以看出整体的流程如下:
快速获取锁,当前没有线程执行的时候直接获取锁
尝试获取锁,当没有线程执行或是当前线程(因为可重入)占用锁,可以直接获取锁
将当前线程包装为Node对象放入同步队列,设置为尾节点
前一个节点如果为头节点,再次尝试获取一次锁
将前一个有效节点设置为SIGNAL并不断轮询检查
阻塞直到被唤醒

判断前驱节点的状态流程为:

唤醒是在unlock()方法中。

