并发之AQS全解析

  created  by  鱼鱼 {{tag}}
创建于 2021年03月10日 01:09:25 最后修改于 2021年03月12日 00:52:08

    我们知道juc(java.util.concurrent)包下有很多实用的类,提供了很多并发工具,例如线程池、原子类、并发工具、信号量工具、锁等,可以说基本实现都为悲观锁,底层原理基本都使用了AQS(AbstractQueuedSynchronizer),AQS不是一种概念,是并发中实打实的工具类。本篇文章针对AQS做解析。

    AQS

    AQS是多线程访问共享资源的同步器框架。AQS的资源可以是独占的也可以是共享的。我们先来简单看一下它的使用方式和ApI(因为是抽象类,是不能直接使用的),下图是AQS的整体脉络。

   

    AQS核心就是一个状态值state,同时维护了一个线程的阻塞队列,队列的节点为有两种状态:SHARED(共享)和EXCLUSIVE(独占),节点状态有五种:

  • CANCELLED(1):取消状态,当线程不再希望获取锁时,设置为取消状态

  • SIGNAL(-1):当前节点的后继者处于等待状态,当前节点的线程如果释放或取消了同步状态,通知后继节点

  • CONDITION(-2):等待队列的等待状态,当调用signal()时,进入同步队列

  • PROPAGATE(-3):共享模式,同步状态的获取的可传播状态

  • 0:初始状态

直接理解AQS其实比较困难,我们可以先通过ReentrantLock的实现来理解。

从ReentrantLock理解AQS

    ReentrantLock大家都不陌生,最常见的API就是lock unlock,它其实就是基于AQS实现的,Lock中有一个子类Sync继承了AQS,又有Sync的子类FairSync和NonFairSync对应着公平所和非 公平锁,当我们无构造声明了一个ReentrantLock对象,其实是新建了Sync对象,类图如下:


在创建对象时初始化了相应的对象:

public ReentrantLock() {
    sync = new NonfairSync();
}


public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

lock()

然后让我们看lock的实现:

public void lock() {
    sync.lock();
}

//fair
final void lock() {
    acquire(1);
}
//NonFail
final void lock() {
    //CAS设置值,如果成功表示当前无锁直接上锁并设置独占线程
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

当非 公平锁此时有锁或是 公平锁时会执行acquire():

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

内部依次执行了

  • tryAcquire

  • acquireQueued(addWaiter(Node.EXCLUSIVE),1);

  • selfInterrupt()

第一步:tryAcquire可以的话直接结束流程

//nonFair tryAcquire
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
                //暂存线程用于重入
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //可重入的体现
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
// tryAcquire
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        //hasQueuedPredecessors 查询是否有线程等待时间更久 用于  公平锁
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            //暂存线程用于重入
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

第二步:当获取不到锁或是可重入锁时,就会继续执行addWaiter():

//传入的是锁的状态,共享还是独占,根据此创建队列节点并放于队列尾
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //走到这里代表没有成功入队(空队伍或是乐观锁入队冲突) 进行自旋操作
    enq(node);
    return node;
}

//自旋操作
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

第三步:经过第二步,当前线程已经入队列,对入队列的节点执行acquireQueued,主要是等待获取锁的机会:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        //自旋,要么获取锁要么中断
        for (;;) {
            //p是node前一个节点
            final Node p = node.predecessor();
            //如果node前一个节点是p,则轮询等待
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //获取锁失败/p不是头节点 park语义为等待
            //前驱节点状态位SIGNAL则等待,防止无限循环浪费资源
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            //设置节点状态为CLOSE 1
            cancelAcquire(node);
    }
}
//检查前驱节点是否是信号就绪状态
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    //前驱节点为信号待命状态(就绪通知后续节点,同时当前节点线程可以wait),则可返回true
    if (ws == Node.SIGNAL)
        return true;
    //前驱节点为取消状态,移除
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    //CAS标识前驱为信号待命
    } else {sad 
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    //先标记为false,下次轮询到自然检查 
    return false;
}
//阻塞并返回线程的中断状态
private final boolean parkAndCheckInterrupt() {
    //进入waiting状态
    LockSupport.park(this);
    return Thread.interrupted();
}

可以看出整体的流程如下:

  • 快速获取锁,当前没有线程执行的时候直接获取锁

  • 尝试获取锁,当没有线程执行或是当前线程(因为可重入)占用锁,可以直接获取锁

  • 将当前线程包装为Node对象放入同步队列,设置为尾节点

  • 前一个节点如果为头节点,再次尝试获取一次锁

  • 将前一个有效节点设置为SIGNAL并不断轮询检查

  • 阻塞直到被唤醒


判断前驱节点的状态流程为:

唤醒是在unlock()方法中。

评论区
评论
{{comment.creator}}
{{comment.createTime}} {{comment.index}}楼
评论

并发之AQS全解析

并发之AQS全解析

    我们知道juc(java.util.concurrent)包下有很多实用的类,提供了很多并发工具,例如线程池、原子类、并发工具、信号量工具、锁等,可以说基本实现都为悲观锁,底层原理基本都使用了AQS(AbstractQueuedSynchronizer),AQS不是一种概念,是并发中实打实的工具类。本篇文章针对AQS做解析。

    AQS

    AQS是多线程访问共享资源的同步器框架。AQS的资源可以是独占的也可以是共享的。我们先来简单看一下它的使用方式和ApI(因为是抽象类,是不能直接使用的),下图是AQS的整体脉络。

   

    AQS核心就是一个状态值state,同时维护了一个线程的阻塞队列,队列的节点为有两种状态:SHARED(共享)和EXCLUSIVE(独占),节点状态有五种:

直接理解AQS其实比较困难,我们可以先通过ReentrantLock的实现来理解。

从ReentrantLock理解AQS

    ReentrantLock大家都不陌生,最常见的API就是lock unlock,它其实就是基于AQS实现的,Lock中有一个子类Sync继承了AQS,又有Sync的子类FairSync和NonFairSync对应着公平所和非 公平锁,当我们无构造声明了一个ReentrantLock对象,其实是新建了Sync对象,类图如下:


在创建对象时初始化了相应的对象:

public ReentrantLock() {
    sync = new NonfairSync();
}


public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

lock()

然后让我们看lock的实现:

public void lock() {
    sync.lock();
}

//fair
final void lock() {
    acquire(1);
}
//NonFail
final void lock() {
    //CAS设置值,如果成功表示当前无锁直接上锁并设置独占线程
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

当非 公平锁此时有锁或是 公平锁时会执行acquire():

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

内部依次执行了

第一步:tryAcquire可以的话直接结束流程

//nonFair tryAcquire
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
                //暂存线程用于重入
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //可重入的体现
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
// tryAcquire
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        //hasQueuedPredecessors 查询是否有线程等待时间更久 用于  公平锁
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            //暂存线程用于重入
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

第二步:当获取不到锁或是可重入锁时,就会继续执行addWaiter():

//传入的是锁的状态,共享还是独占,根据此创建队列节点并放于队列尾
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //走到这里代表没有成功入队(空队伍或是乐观锁入队冲突) 进行自旋操作
    enq(node);
    return node;
}

//自旋操作
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

第三步:经过第二步,当前线程已经入队列,对入队列的节点执行acquireQueued,主要是等待获取锁的机会:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        //自旋,要么获取锁要么中断
        for (;;) {
            //p是node前一个节点
            final Node p = node.predecessor();
            //如果node前一个节点是p,则轮询等待
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //获取锁失败/p不是头节点 park语义为等待
            //前驱节点状态位SIGNAL则等待,防止无限循环浪费资源
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            //设置节点状态为CLOSE 1
            cancelAcquire(node);
    }
}
//检查前驱节点是否是信号就绪状态
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    //前驱节点为信号待命状态(就绪通知后续节点,同时当前节点线程可以wait),则可返回true
    if (ws == Node.SIGNAL)
        return true;
    //前驱节点为取消状态,移除
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    //CAS标识前驱为信号待命
    } else {sad 
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    //先标记为false,下次轮询到自然检查 
    return false;
}
//阻塞并返回线程的中断状态
private final boolean parkAndCheckInterrupt() {
    //进入waiting状态
    LockSupport.park(this);
    return Thread.interrupted();
}

可以看出整体的流程如下:


判断前驱节点的状态流程为:

唤醒是在unlock()方法中。


并发之AQS全解析2021-03-12鱼鱼

{{commentTitle}}

评论   ctrl+Enter 发送评论