网站首页 > 技术教程 正文
之前文章里ReentrantReadWriteLock、ReentrantLock、CountDownLatch、Semaphore这些concurrent包跟锁相关的类都继承了AbstractQueuedSynchronizer(AQS)的去实现锁的逻辑,具体实现可以看之前的文章,本篇文章主要是看看AQS具体用法和构造。
首先看下源码翻译,这个是了解AQS的核心功能最好的方法。
提供一个用于实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关同步器(信号量、事件等)的框架。这个类被设计成是大多数同步器或者锁依赖单个原子值(int)来表示状态的基础底层。子类必须重定义有关更改此状态的protected方法,并定义当该状态获取或释放时对此对象意味着什么。考虑到这些,这个类中的其他方法是用来提供所有的排队和阻塞机制的实现。子类可以维护其他状态字段,但是只有原子性操作更新状态值(int),目前是getState、setState和compareAndSetState这三个方法的操作才与同步有关。
子类应定义为非公共内部帮助器类,用于实现其封闭类的同步属性,AbstractQueuedSynchronizer不实现任何同步接口,相反,它定义了诸如acquireInterruptible这样的方法,可以由具体的锁和相关的同步器适当地调用来实现它们的公共方法。
此类默认子类可以支持独占模式和共享模式中的一种或两者都支持。在独占模式下获取时,其他线程尝试的获取失败。由多个线程获取的共享模式可能(但不一定)成功,这个类本身不处理这些区别。当共享模式获取成功时,下一个等待线程(如果存在的话)也必须确定它是否也可以获取。在不同模式下等待的线程共享同一个FIFO队列。通常,实现子类只支持其中一种模式,但这两种模式都可以发挥作用,例如ReadWriteLock。只支持独占模式或只支持共享模式的子类不需要定义支持未使用模式的方法。
此类定义了一个嵌套的ConditionObject类,该类可由支持独占模式的子类用作Condition实现,方法isHeldExclusively报告是否对当前线程以独占方式保持同步,使用getState值调用的release方法将完全释放该对象,acquire(给定此保存的状态值)最终将该对象恢复到以前获取的状态。如果不能满足AbstractQueuedSynchronizer方法创建需要的条件约束,请不要使用它。ConditionObject的行为当然取决于其同步器实现的语义。
AQS为内部队列提供检查、检测和监视方法,以及condition的类似方法。可以根据需要使用abstractqueuedsynchronizer将它们使用到子类中,作为它们的同步机制。
AQS的序列化只存储底层的原子整数维护状态,因此反序列化的对象是空线程队列。需要序列化的子类将定义一个readobject方法,该方法在反序列化时将其恢复到已知的初始状态。
如何使用?
若要将此类用作同步器的基础,请根据需要重新定义以下方法,方法是跟getState、setState 或者 compareAndSetState有关方法,主要是用来检查或者修改状态的方法。
- tryAcquire
- tryRelease
- tryAcquireShared
- tryReleaseShared
- isHeldExclusively
默认情况下,这些方法中的每一个都抛出UnsupportedOperationException。这些方法的实现必须是内部线程安全的,并且通常应该处理时间短而不是阻塞。定义这些方法是才能支持使用该类。所有其他方法都声明为final,因为它们不能独立地改变。
会发现从AbstractOwnableSynchronizer继承的方法有助于跟踪拥有独占同步器的线程。它鼓励子类使用它们——这些监视和诊断工具能够帮助确定哪些线程持有锁。
即使此类基于内部FIFO队列,它也不会自动实施FIFO获取策略。独占同步的核心形式如下:
Acquire:
while (!tryAcquire(arg)) {
如果线程尚未排队,请将其排队
可能阻塞当前线程;
}
Release:
if (tryRelease(arg))
解锁第一个排队线程;
(共享模式类似,但可能涉及多个信号。)
因为尝试获取锁的方法是在排队之前调用的,所以新线程获取锁可能会在阻塞和排队的其他线程之前抢先。但是,如果需要,可以定义tryAcquire或者tryAcquireShared通过内部调用一个或多个检查方法来禁用抢占,从而提供一个公平的FIFO获取顺序。特别是,大多数公平同步器可以定义 tryAcquire方法 当 如果hasQueuedPreprocessors方法(一种专门为公平同步器使用的方法)返回true时tryAcquire返回false。其他变化也是可能的。
对于默认的饥饿抢占策略,吞吐量和可伸缩性通常最高。虽然这不能保证是公平的或者没有饥饿线程,但是允许较早排队的线程在后面的队列线程之前重新尝试获取锁,并且每次重新获取锁的时候都有一个对新线程的无偏见的机会去抢占。此外,虽然acquire在通常意义上不会自旋,但可能会在阻塞之前执行多个 tryAcquire的调用,并在阻塞之前将其与其他计算混合在一起。当独占同步只被短暂保持时,这就体现了自旋的好处。如果需要公平性,可以通过前面的调用来增强这一点,获得带有“fast-path”检查的方法,可以调用预先检查hasConferred或者hasQueuedThreads方法,来保证同步器的安全性。
这个类为同步提供了一个高效和可伸缩的基础,它的使用范围是可以依赖int状态、获取和释放参数以及内部FIFO等待队列的同步器。当这还不够时,可以使用低级别构建自己的同步器,用java.util.concurrent.atomic类和自定义java.util.Queue类以及locksupport类阻塞支持。
使用例子
这是一个不可重入的互斥锁类,它使用值0表示解锁状态,而1表示锁定状态。不可重入锁强烈要求记录当前的所有者线程,这个类也这样做,为了使更易于监视。它还支持condition并公开其中一种检测方法:
class Mutex implements Lock, java.io.Serializable {
// Our internal helper class
private static class Sync extends AbstractQueuedSynchronizer {
// Reports whether in locked state
protected boolean isHeldExclusively() {
return getState() == 1;
}
// Acquires the lock if state is zero
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// Releases the lock by setting state to zero
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// Provides a Condition
Condition newCondition() { return new ConditionObject(); }
// Deserializes properly
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
// The sync object does all the hard work. We just forward to it.
private final Sync sync = new Sync();
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
}
下一个例子:
这是一个门锁类类似java.util.concurrent.CountDownLatch CountDownLatch,但它只需要一个signal才能触发。因为门锁是非排它的,所以它使用可分享逻辑获取和释放方法。
class BooleanLatch {
private static class Sync extends AbstractQueuedSynchronizer {
boolean isSignalled() { return getState() != 0; }
protected int tryAcquireShared(int ignore) {
return isSignalled() ? 1 : -1;
}
protected boolean tryReleaseShared(int ignore) {
setState(1);
return true;
}
}
private final Sync sync = new Sync();
public boolean isSignalled() { return sync.isSignalled(); }
public void signal() { sync.releaseShared(1); }
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
}
}
OK,上面就是AQS的介绍,那么看下具体源码实现吧
因为AQS是FIFO队列,那么它肯定有个队列的对象,那就首先看看它怎么设计的。
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus值指示线程正在等待 */
static final int 条件 = -2;
/**
* waitStatus指示下一个acquireShared应无条件传播
*/
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
/**
* 链接到下一个等待条件的节点,或特殊值SHARED。因为conditions队列只有在独占模式下保持时
* 才被访问,所以我们只需要一个简单的链接队列来保存节点等待条件。然后将它们传输到队
* 列以重新获取。因为条件只能是排他的,所以我们用特殊值SHARED来表示共享模式来保存字段。
*/
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
可以看到跟平时的链式节点设计是一样的, 节点内容存储的是线程,然后包含前后节点,只不过多了waitStatus 标识等待的状态,可以看到对应定义的几个状态。nextWaiter 是Condition用于记录它的队列,如果是共享模式会特殊值SHARED,作为临时使用,所以Condition只能在独占模式可以使用。
然后再看看AQS其他参数
/**
Head of the wait queue, lazily initialized. Except for initialization, it is modified only via method setHead. Note: If head exists, its waitStatus is guaranteed not to be CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
/**
* The synchronization state.
*/
private volatile int state;
可以看到AQS会记录队列的头部和尾部,通过state作为同步状态一个表示。通过unsafe操作对应这几个字段的CAS。
那么CAS的整体属性构成也大体明白了,就是通过链式的NODE去作为FIFO的基础,在通过state作为当前同步标识(锁状态,重入数等等),node中又标识了独占或者共享等待状态。也就明白的大体的设计思路。
那么再看下几个方法看下具体怎么流程的.这几个都是尝试修改同步状态,修改失败加队列,主要是分为独占模式的线程中断校验的抢占和普通抢占,以及分享模式的线程中断校验的抢占和普通抢占。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
可以看到都会执行tryAcquire或者tryAcquireShared,这个就是上文说的需要子类去实现逻辑,就是为了处理state如何作为同步状态逻辑,如果处理成功就会exclusiveOwnerThread表示为当前同步状态获取的线程。如果失败都会去执行doAcquire相关的方法,可以看到acquire会直接添加等待节点 addWaiter(Node.EXCLUSIVE),标示是独占标示。那我们看看这个4个方法吧。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
可以看到都大同小异,都是如果前置节点是head节点,就再尝试获取下同步标示,如果成功,都是将node放入head,接下来独占模式就是直接返回,分享模式是会唤醒下一个。如果没有则waitStatus设置成-1,然后进行休眠,这个4个都是一致的,但是如果线程中断,那么Interruptibly都会抛异常,而其他就会自动唤醒。
但是这个有个比较绕的地方,就是当第一个node加入的时候,node的prev和next都是本身,AQS的tail和head也设置成node。就是enq和doAcquire这块逻辑,但是上面可以看到如果二次尝试抢占成功会将node的next设置null,那么是不是就有问题了呢,又并不会,因为enq去通过tail去补偿next。这个地方逻辑是真的佩服写作者,太绕了。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
那么Acquire大体流程逻辑了解到这。再看看release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
可以看到抢占式的release首先调用tryRelease这个是子类需要重写的,用于对同步状态操作,如果需要释放,就会调用unparkSuccessor方法,主要逻辑就是唤醒队列里下一个waitStatus<=0的线程。
而共享式的调用tryReleaseShared也是子类需要重写的,也是用于对同步状态操作,如果需要释放那就将head的waitStatus尝试设置成0,如果成功就调用唤醒队列里下一个waitStatus<=0的线程方法。
好那么从尝试抢占到释放,AQS的逻辑就是这样的,整体总结就是AQS通过status去控制同步状态,但是如果控制让子类去去做,AQS不关心,AQS只F负责FIFO队列阻塞和唤醒。
今天就分析到这!
猜你喜欢
- 2024-10-30 读写锁,你难道不需要了解一下吗?
- 2024-10-30 谷歌云故障14个小时,系“队列突变大量积压”引起
- 2024-10-30 什么是AQS及其原理(aqs作用)
- 2024-10-30 码仔漫画:怎么给女朋友讲明白线程池?
- 2024-10-30 面试官:谈谈这4种磁盘IO调度算法--CFQ、NOOP、Deadline、AS
- 2024-10-30 QT的信号槽机制简介(qt信号槽优缺点)
- 2024-10-30 Java AQS(AbstractQueuedSynchronizer)详解
- 2024-10-30 AQS是什么(AQS是什么药品)
- 2024-10-30 详解磁盘IO调度算法--CFQ、NOOP、Deadline、AS
- 2024-10-30 基于AbstractQueuedSynchronizer的并发类实现
你 发表评论:
欢迎- 最近发表
-
- linux CentOS检查见后门程序的shell
- 网络安全工程师演示:黑客是如何使用Nmap网络扫描工具的?
- Linux中ftp服务修改默认21端口等(linux修改ftp配置文件)
- Linux系统下使用Iptables配置端口转发,运维实战收藏!
- 谈谈TCP和UDP源端口的确定(tcp和udp的端口号相同吗)
- Linux 系统 通过端口号找到对应的服务及相应安装位置
- 快速查找NAS未占用端口!Docker端口秒级排查+可视化占坑双杀技
- 【知识杂谈#2】如何查看Linux的(本地与公网)IP地址与SSH端口号
- 如何在Linux中查询 DNS 记录,这三个命令可谓是最常用、最经典的
- 【Linux系统编程】特殊进程之守护进程
- 标签列表
-
- 下划线是什么 (87)
- 精美网站 (58)
- qq登录界面 (90)
- nginx 命令 (82)
- nginx .http (73)
- nginx lua (70)
- nginx 重定向 (68)
- Nginx超时 (65)
- nginx 监控 (57)
- odbc (59)
- rar密码破解工具 (62)
- annotation (71)
- 红黑树 (57)
- 智力题 (62)
- php空间申请 (61)
- 按键精灵 注册码 (69)
- 软件测试报告 (59)
- ntcreatefile (64)
- 闪动文字 (56)
- guid (66)
- abap (63)
- mpeg 2 (65)
- column (63)
- dreamweaver教程 (57)
- excel行列转换 (56)
本文暂时没有评论,来添加一个吧(●'◡'●)