网站首页 > 技术教程 正文
AbstractQueuedSynchronizer
AbstractQueuedSynchronizer(简称AQS)位于java.util.concurrent包下。许多同步组件都是基于它实现的。譬如:ReentrantLock、ReentrantReadWriteLock、CountDownLatch、CyclicBarrier、Semaphore等等。
AbstractQueuedSynchronizer是一个抽象类,定义了许多属性和方法。其中最基本的数据结构——Node。AQS定义了一套多线程访问共享资源的同步器框架,是一个依赖状态(state)的同步器。以下再描述。
CLH队列
AQS核心思想是当请求共享资源时,如果当前线程成功请求共享资源,就将共享资源锁住。其他的请求进入排队阻塞状态中,等待某一时间,唤醒重新获取。这个机制AQS主要用的是CLH队列的变体实现的。将暂时获取不到锁的线程放到队列中。
CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表数据结构的队列,是FIFO先入先出线程等待队列,Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。具体啥是CLH队列可参看:
https://coderbee.net/index.php/concurrent/20131115/577/comment-page-1
AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点(Node)来实现锁的分配。
内部Node节点
{
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev;
/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
其中:
static final Node SHARED = new Node();
用于标记一个节点在共享模式下等待
static final Node EXCLUSIVE = null;
用于标记一个节点在独占模式下等待
SHARED ,EXCLUSIVE 分别表示了线程两种锁的模式:共享模式和独占模式。
独占:只有一个线程能执行,如ReentrantLock
共享:多个线程可以同时执行,如Semaphore/CountDownLatch
其他方法和属性值:
方法和属性值 | 含义 |
waitStatus | 当前Node在队列中的状态 |
thread | 表示处于该Node的线程 |
prev | 前驱指针 |
predecessor | 返回前驱节点,没有的话抛出NullPointerException |
nextWaiter | 指向下一个处于CONDITION状态的节点(由于本篇文章不讲述Condition Queue队列,这个指针不多介绍) |
next | 后继指针 |
waitStatus有下面几个值:
枚举 | 含义 |
0 | 当一个Node被初始化的时候的默认值 |
CANCELLED | 为1,表示线程获取锁的请求已经取消了 |
CONDITION | 为-2,表示节点在等待队列中,节点线程等待唤醒 |
PROPAGATE | 为-3,当前线程处在SHARED情况下,该字段才会使用 |
SIGNAL | 为-1,表示线程已经准备好了,就等资源释放了 |
同步状态State
AQS中还维护了一个名为state的字段,意为同步状态,是由Volatile修饰的,用于展示当前临界资源的获锁情况。
private volatile int state;
同时提供了访问和更新这个变量的方法。
protected final int getState():获取State的值
protected final void setState(int newState):设置State的值
protected final boolean compareAndSetState(int expect, int update):使用CAS方式更新State
所以可以用修改state变量来实现多线程的独占模式和共享模式(加锁过程)。
独占模式:刚开始初始化state=0;当线程A独占模式下获取锁,先判断state是否为0,若不是则阻塞,加入队列。若为0,则设置state为1,获取了锁。进行后续的操作。
共享模式:刚开始初始化state=n(例如n=2);当线程A共享模式模式下获取锁,判断state是否大于0,如果不是则A阻塞,加入队列。若大于0,则stateCAS自减。获取了锁。进行后续的操作。
从ReentrantLock看AQS
AQS定义了许多方法,包括自己实现的和未实现的。
AQS对外提供一些API层方法,所以自定义同步器只要实现它的一些方法,不必关心它底层是怎么实现的。
API层方法:
protected boolean tryAcquire(int arg):
独占方式。arg为获取锁的次数,尝试获取资源,成功则返回True,失败则返回False。
protected boolean tryRelease(int arg):
独占方式。arg为释放锁的次数,尝试释放资源,成功则返回True,失败则返回False。
protected int tryAcquireShared(int arg):
共享方式。arg为获取锁的次数,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected boolean tryReleaseShared(int arg):
共享方式。arg为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待结点返回True,否则返回False。
如下图:
蓝色:AQS已经实现的方法
红色:需要自己覆盖或重写的方法
我们看看我们ReentrantLock内部怎么去实现的。平常使用ReentrantLock时候,大多这样写的:
ReentrantLock takeLock = new ReentrantLock();
// 获取锁
takeLock.lock();
try {
// 业务逻辑
} finally {
// 释放锁
takeLock.unlock();
}
进入ReentrantLock,当ReentrantLock初始化时,默认将 sync赋值为NonfairSync。
public ReentrantLock() {
sync = new NonfairSync();
}
当我们ReentrantLock takeLock = new ReentrantLock(true)这样时,
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
将 sync赋值为FairSync。
FairSync,NonfairSync为别通常我们所说的公平锁,和非公平锁的一些实现。(公平锁,和非公平锁后面再说)
ReentrantLock内有三个内部类:Sync,NonfairSync,FairSync。
其中Sync为ReentrantLock的一个内部类,它继承了AbstractQueuedSynchronizer。
abstract static class Sync extends AbstractQueuedSynchronizer
而:FairSync,NonfairSync都继承了Sync,实现了Sync的抽象方法:lock,和重写了AQS的tryAcquire方法。Sync实现了AbstractQueuedSynchronizer的抽象方法:tryRelease。
进入:takeLock.lock();
public void lock() {
sync.lock();
}
从上面所述,此时会调用NonfairSync的lock方法。
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
若通过CAS设置变量State(同步状态)成功,也就是获取锁成功,则将当前线程设置为独占线程。
若通过CAS设置变量State(同步状态)失败,也就是获取锁失败,则进入父类AQS的acquire方法进行后续处理。
这时我们看下公平锁FairSync是怎么做的。
final void lock() {
acquire(1);
}
直接调用父类AQS的acquire方法。
所以从这里我们可以看出公平锁和非公平锁的区别。
公平锁:就是很公平,在并发环境中.每个线程在获取锁时会先查看此锁维护的等待队列,如果为空,或者当前线程是等待队列的第一个,就占有锁。否则就会加入到等待队列中.以后会按照FIFO的规则从队列中取到自己。
非公平锁比较粗鲁,上来就直接尝试占有锁,如果尝试失败,就再采用类似公平锁那种方式。
在这里我们总结下上面的步骤:
- 通过ReentrantLock的加锁方法Lock进行加锁操作。
- 会调用到内部类Sync的Lock方法,由于Sync#lock是抽象方法,根据ReentrantLock初始化选择的公平锁和非公平锁,执行相关内部类的Lock方法,本质上都会执行AQS的Acquire方法。
进入AQS的方法acquire:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这里的tryAcquire,AQS直接抛出异常,需要子类去重写。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
AQS的Acquire方法会执行tryAcquire方法,但是由于tryAcquire需要自定义同步器实现,因此执行了ReentrantLock中的tryAcquire方法,由于ReentrantLock是通过公平锁和非公平锁内部类实现的tryAcquire方法,因此会根据锁类型不同,执行不同的tryAcquire。
进入NonfairSync的tryAcquire方法:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
以上代码大意为:
首先判断state的值,如果是0,采用compareAndSetState将state设置为1,再将当前线程设置为独占线程。
如果不是为0,再次判断当前线程是否是独占线程,如果是的,对state再次加1,更新state,返回true。这里又引入一个可重入锁的概念。
- 可重入锁:当线程获取某个锁后,还可以继续获取它,可以递归调用,而不会发生死锁;
- 不可重入锁:与可重入相反,获取锁后不能重复获取,否则会死锁(自己锁自己)。
synchronized和ReentrantLock都是可重入锁。可重入锁和不可重入锁具体可参看:
https://cloud.tencent.com/developer/article/1493305
在此不做过多介绍了。
这里tryAcquire方法就介绍完了,tryAcquire是获取锁逻辑,获取失败后,会执行框架AQS的后续逻辑,跟ReentrantLock自定义同步器无关。回到上面,如果没有获取到锁就执行:
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
首先看:addWaiter(Node.EXCLUSIVE)
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
addWaiter主要就是获锁失败一个入队的操作,以当前的线程和锁模式新建一个节点,尝试下快速入队。接着将tail赋予pred变量。如果pred不为null,说明队列有节点。将New出的Node的prev指针指向pred。通过compareAndSetTail方法,完成尾节点的设置。
这里New出的节点变成tail节点,新tail节点的prev指向原先的tail节点,next为null。
原先的tail节点的prev不变,next指向新tail节点。
如果pred为null,则执行enq(node);
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
t为null,队列是没有元素,首先会构建一个新节点,head和tail同时指向这个新Node。
如果t不为null,则向之前那样,进行队尾入队操作。
总结:如果队列刚开始是空的,首先会构建一个新的Node,head和tail指针同时指向它,再往队列队尾入队。
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
addWaiter做完了,并返回是一个包含该线程的Node。接着看下acquireQueued。
final boolean acquireQueued(final Node node, int arg) {
// 标记是否成功拿到资源
boolean failed = true;
try {
// 标记等待过程中是否中断过
boolean interrupted = false;
// 开始自旋,要么获取锁,要么中断
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 如果p是头结点,说明当前节点在真实数据队列的首部,头节点为虚节点不存储一些值
//也就是当前节点为头节点的下一个节点
//在这里会尝试获取锁,万一此时头节点正好释放了锁
//如果成功获取了锁,就将自己设置为head,占位成功
//但没有修改waitStatus
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
tryAcquire尝试获取锁,addWaiter是获锁失败了加入队列,acquireQueued会把放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)。
进入acquireQueued方法,首先会判断当前节点的前驱节点是否为头(head)节点,如果是则再次的尝试获取锁。如果不是,则进入shouldParkAfterFailedAcquire(p, node)。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
从shouldParkAfterFailedAcquire方法名字,可以看出,如果获取锁失败,是否需要阻塞。不阻塞,这里的for(;;)循环会一直执行下去。
Node node:当前节点,Node pred:当前节点的前驱节点。首先会判断前驱节点的waitStatus。如果是Node.SIGNAL,则返回true。
如果前驱节点的waitStatus>0,则waitStatu是为1,为CANCELLED(只有在这种状态下才大于0)。表示线程获取锁的请求已经取消了。将循环向前查找取消节点,把取消节点从队列中剔除。
其他情况下,即前驱节点的waitStatus改为Node.SIGNAL。这样下次循环再次判断,将返回true。
判断了前驱节点的waitStatus,将进入parkAndCheckInterrupt方法。从方法名字也可以看出此方法是干嘛的,阻塞和检查中断。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
从上面代码,它真的就干了两件事,阻塞当前的线程,返回当前线程的中断标识位(关于线程的中断,可以看看相关的中断知识)。如果被中断了,后面某时刻会调用selfInterrupt();
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
在acquireQueued方法有个finally块:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
...
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
...
failed = false;
...
}
...
} finally {
if (failed)
cancelAcquire(node);
}
}
这段代码是干什么的。在AQS中类似其他的方法中,我们可以找到这块代码的意义。
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在发生中断或其他情况下,进入取消获取锁cancelAcquire方法。
private void cancelAcquire(Node node) {
// 忽略不存在的节点
if (node == null)
return;
//将该节点不关联任何线程
node.thread = null;
// 跳过当前节点之前的CANCELLED的节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 获取过滤后的前驱节点的后继节点
Node predNext = pred.next;
// 把当前node的状态设置为CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点
// 更新失败的话,则进入else,如果更新成功,将tail的后继节点设置为null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// 如果当前节点不是head的后继节点,
//1:判断当前节点前驱节点的是否为SIGNAL,
//2:如果不是,则把前驱节点设置为SINGAL看是否成功
// 如果1和2中有一个为true,再判断当前节点的线程是否为null
// 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 如果当前节点是head的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点
unparkSuccessor(node);
}
//其他情况下将node的后继指针指向当前节点
node.next = node; // help GC
}
}
获取当前节点的前驱节点,如果前驱节点的状态是CANCELLED,那就一直往前遍历,找到第一个waitStatus <= 0的节点,将找到的Pred节点和当前Node关联,将当前Node设置为CANCELLED。
最后Node可能处于以下几种情况:
1.当前节点是尾节点。
2.当前节点是Head的后继节点
3.当前节点不是Head的后继节点,也不是尾节点
至此介绍完了,ReentrantLock的lock方法获取锁的过程。
接着介绍ReentrantLock的unlock释放锁的过程。
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
unlock方法会调用父类AQS的release方法,接着会调用Sync类重写的tryRelease方法。释放锁没有公平锁和非公平锁的说法,在这里Sync内部类自己重写了。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
获取state值,再去releases(这里为1)可重入数。如果当前线程不是持有锁的线程则会抛出异常。接着判断更新后的state是否为0,如果是0,则持有锁线程已经全部释放,将ExclusiveOwnerThread置为null,更新state。如果不是,更新state。
执行完tryRelease,
public final boolean release(int arg) {
if (tryRelease(arg)) {
//// 获取头结点
Node h = head;
//// 头结点不为空并且头结点的waitStatus不是初始化节点情况,解除线程挂起状态
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
//这里的node传入的是head
//获取head的waitStatus
int ws = node.waitStatus;
if (ws < 0)
//如果waitStatus<0,将如果waitStatus设置为0
compareAndSetWaitStatus(node, ws, 0);
//获取head的后继节点
Node s = node.next;
//如果后继节点是null了,或者是节点被cancelled了
if (s == null || s.waitStatus > 0) {
s = null;
//从tail位置向前遍历,取初第一个waitStatus <= 0的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//找到了这个节点将,这个节点唤醒,
//唤醒后,又跑到之前分析的方法acquireQueued,尝试获取锁
LockSupport.unpark(s.thread);
}
但是为什么要从后往前找第一个非Cancelled的节点呢?
从之前addWaiter代码看:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
我们从这里可以看到,节点入队并不是原子操作,也就是说,node.prev = pred; compareAndSetTail(pred, node) 这两个地方可以看作Tail入队的原子操作,但是此时pred.next = node;还没执行,如果这个时候执行了unparkSuccessor方法,就没办法从前往后找了,所以需要从后往前找。
还有一点原因,在产生CANCELLED状态节点的时候,先断开的是Next指针,Prev指针并未断开,因此也是必须要从后往前遍历才能够遍历完全部的Node。
综上所述,如果是从前往后找,由于极端情况下入队的非原子操作和CANCELLED节点产生过程中断开Next指针的操作,可能会导致无法遍历所有的节点。所以,唤醒对应的线程后,对应的线程就会继续往下执行。
至此,通过ReentrantLock的lock和unlock方法,进而分析AQS的原理解析结束了。其他同步类Semaphore,CountDownLatch,ReentrantReadWriteLock原理也是很相近的。
自定义同步工具
通过以上分析,我们也可以用AQS来是实现一个简单的同步工具类。
public interface MyLock {
void lock();
void unlock();
}
public class MyCustomLock implements MyLock{
//像ReentrantLock那样,引入Sync内部类,我们只要重现了tryRelease,tryAcquire。其他交给Sync
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire (int arg) {
return compareAndSetState(0, 1);
}
@Override
protected boolean tryRelease (int arg) {
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively () {
return getState() == 1;
}
}
private Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void unlock() {
sync.release(1);
}
}
public class Lock_Test extends Thread {
// 定义变量记录总票数
private static int tickets = 100;
// 创建互斥锁对象
private static MyLock lockObj = new MyCustomLock();
@Override
public void run() {
// 使用循环保证票能够被卖完
while (true) {
try {
// 模拟休眠
Thread.sleep(500);
// 获取锁
lockObj.lock();
// 开始卖票
if (tickets > 0) {
// 卖一张
System.out.println(this.getName() + " 卖了一张票,还剩 " + (--tickets) + " 张");
continue;
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放锁
lockObj.unlock();
}
System.out.println("票没了...");
break;
}
}
public static void main(String[] args) {
Lock_Test t1=new Lock_Test();
Lock_Test t2=new Lock_Test();
Lock_Test t3=new Lock_Test();
t1.start();
t2.start();
t3.start();
}
}
最后打印结果:
这里可以看出AQS框架的强大,只需要不多的代码,就可以自己实现一个同步工具类。
参考资料:
https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html
猜你喜欢
- 2024-10-30 读写锁,你难道不需要了解一下吗?
- 2024-10-30 谷歌云故障14个小时,系“队列突变大量积压”引起
- 2024-10-30 什么是AQS及其原理(aqs作用)
- 2024-10-30 码仔漫画:怎么给女朋友讲明白线程池?
- 2024-10-30 面试官:谈谈这4种磁盘IO调度算法--CFQ、NOOP、Deadline、AS
- 2024-10-30 QT的信号槽机制简介(qt信号槽优缺点)
- 2024-10-30 Java AQS(AbstractQueuedSynchronizer)详解
- 2024-10-30 AQS是什么(AQS是什么药品)
- 2024-10-30 详解磁盘IO调度算法--CFQ、NOOP、Deadline、AS
- 2024-10-30 基于AbstractQueuedSynchronizer的并发类实现
你 发表评论:
欢迎- 最近发表
-
- linux CentOS检查见后门程序的shell
- 网络安全工程师演示:黑客是如何使用Nmap网络扫描工具的?
- Linux中ftp服务修改默认21端口等(linux修改ftp配置文件)
- Linux系统下使用Iptables配置端口转发,运维实战收藏!
- 谈谈TCP和UDP源端口的确定(tcp和udp的端口号相同吗)
- Linux 系统 通过端口号找到对应的服务及相应安装位置
- 快速查找NAS未占用端口!Docker端口秒级排查+可视化占坑双杀技
- 【知识杂谈#2】如何查看Linux的(本地与公网)IP地址与SSH端口号
- 如何在Linux中查询 DNS 记录,这三个命令可谓是最常用、最经典的
- 【Linux系统编程】特殊进程之守护进程
- 标签列表
-
- 下划线是什么 (87)
- 精美网站 (58)
- qq登录界面 (90)
- nginx 命令 (82)
- nginx .http (73)
- nginx lua (70)
- nginx 重定向 (68)
- Nginx超时 (65)
- nginx 监控 (57)
- odbc (59)
- rar密码破解工具 (62)
- annotation (71)
- 红黑树 (57)
- 智力题 (62)
- php空间申请 (61)
- 按键精灵 注册码 (69)
- 软件测试报告 (59)
- ntcreatefile (64)
- 闪动文字 (56)
- guid (66)
- abap (63)
- mpeg 2 (65)
- column (63)
- dreamweaver教程 (57)
- excel行列转换 (56)
本文暂时没有评论,来添加一个吧(●'◡'●)