您现在的位置:首页 > >

AQS实践(一):ReentrantLock概述

发布时间:

1 ReentrantLock类图结构

ReentrantLock是可重入的独占锁,同时只能有一个线程可以获取到该锁,其他获取该锁的线程会被阻塞而被放入该锁的AQS阻塞队列里面。

上图是ReentrantLock类图结构,通过类图可以对ReentrantLock内部有个大致的了解。
ReentrantLock是使用AQS来实现的,并且根据参数来决定内部是一个公*锁还是非公*锁。


// 默认是非公*锁
public ReentrantLock() {
sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

Sync是AQS的子类,NonfairSync和FairSync又是Sync的子类,分别实现了非公*策略和公*策略。
简要说明一下,在ReentrantLock里面,AQS的state状态值表示线程获取该锁的可重入次数,在默认情况下,state=0表示当前锁没有被任何线程锁持有。当一个线程第一次获取该锁时,会尝试使用CAS这是state的值为1,如果CAS成功则当前线程获取了该锁,然后记录该锁的持有者为当前线程。在该线程没有释放锁的情况下,第二次获取该锁后,state会被设置为2,这就是可重入次数。在该线程释放该锁时,会尝试使用CAS让state减1,如果减1后,state为0,则当前线程释放该锁。


2 获取锁
2.1 void lock()
2.1.1 非公*锁

当一个线程调用该方法时,说明该线程希望获取该锁。如果该锁当前没有被其他线程占用,并且当前线程之前没有获取过该锁,则当前线程会获取到该锁,然后设置当前锁的拥有者为当前线程,并设置AQS的状态值为1,然后直接返回。如果当前线程之前已经获取过该锁,则这次只是简单地把AQS的状态值加1后返回。如果该锁已经被其他线程持有,则调用该方法的线程会被放入AQS阻塞队列。


public void lock() {
sync.lock();
}

在如上代码中,ReentrantLock的lock()委托给了Sync类,根据创建ReentrantLock构造函数选择Sync的实现是NonfairSync还是FairSync,这个锁是一个非公*锁或者公*锁。
下面代码是NonfairSync的lock(),也就是非公*锁。


final void lock() {
// CAS设置状态值,从0变为1
if (compareAndSetState(0, 1))
// 如果设置成功,则设置该锁的持有者是当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 否则调用AQS的acquire方法
acquire(1);
}

如下是AQS的acquire()


public final void acquire(int arg) {
// 调用Sync重写的tryAcquire
if (!tryAcquire(arg) &&
// tryAcquirea放回false会把当前线程fangruAQS阻塞队列
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

继续看一下Sync实现的tryAcquire


protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

// 注意acquires的值为1
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取此时state的值
int c = getState();
// 如果state的值为1
if (c == 0) {
// CAS设置state的值,从0变为1
if (compareAndSetState(0, acquires)) {
// 设置该锁的持有者是当前线程
setExclusiveOwnerThread(current);
return true;
}
}
// 如果该锁的持有者是当前线程
else if (current == getExclusiveOwnerThread()) {
// 简单的将state加1
int nextc = c + acquires;
// 注意精度溢出
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 如果state不为0
// 如果当前线程不是锁持有者
// 满足以上两种情况,就返回false
return false;
}

简单的流程串一下,首先获取当前线程,查看当前锁的状态是否为0,为0则说明当前该锁空闲,那么就尝试CAS获取该锁,将AQS的state从0设置为1,并设置当前锁的持有者为当前线程,然后返回true。如果当前state不为0,则说明该锁已经被某个线程所持有,素有查看当前线程是否是该锁的持有者,如果当前线程是该锁的持有者,则state加1,然后返回true。需要注意的一点,nextc<0说明可重入次数溢出了。如果当前线程不是锁的持有者则返回false,然后会被加入AQS阻塞队列。


2.1.2 非公*锁的不公*体现在何处?

首先要明白的是,非公*是说,首先尝试获取锁的线程并不一定比后尝试获取锁的线程优先获取锁。
举个例子,假设线程A调用lock()方法时执行到nonfairTryAcquire时,发现当前state不为0,然后又发现当前线程不是锁的持有者,最后返回false,然后当前线程被放入到AQS阻塞队列。这时候线程B也调用了lock()方法执行到nonfairTryAcquire时,发现当前state为0了(假设持有锁的线程释放了锁),然后通过CAS设置获取到了该锁,明明是线程A先请求获取锁,最后线程B获取到了锁,这就是非公*的体现。
简而言之,就是线程B在获取锁之前没有查看当前AQS队列里面是否有比自己更早请求该锁的线程,而是直接抢占。


2.1.3 公*锁

// 同样,acquires的值为1
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取当前的state
int c = getState();
// 如果当前state为0
if (c == 0) {
// 公*性策略
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 当前线程是锁的持有者
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

分析一下hasQueuedPredecessors这个方法。
可以大致分为以下几种情况:


如果h==t,说明当前队列为空,返回fasle若果h!=t并且s==null,说明有一个元素要作为AQS的第一个结点入队列(之前也了解过,第一个结点入队列的时候,会创建一个哨兵结点,然后将第一个结点插到哨兵结点的后面),返回true如果h!=t并且s!=null,此时 s.thread != Thread.currentThread(),则说明队列里面的第一个元素不是当前线程,那么返回true

公*锁就会多了一个公*策略,其余的操作和非公*锁的操作完全一致,在这里就不多加赘述了。


2.2 void lockInterruptibly()

与lock()方法类似,不同之处在于,它对中断进行响应,就是当前线程在调用该方法时,如果其他方法调用了当前线程的interrupt()方法时,则当前线程会抛出InterruptedException异常,然后返回。


public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 如果当前线程被中断,直接抛出异常
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
// 调用AQS中可以被中断的方法
doAcquireInterruptibly(arg);
}

// 与acquireQueued方法类似,这里就不多加赘述
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
// 将当前线程尾插到AQS阻塞队列
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 检测到外界进行了中断,会抛出InterruptedException异常
// 在acquireQueued中,设置了中断标志
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

2.3 boolean tryLock()

尝试获取锁,如果当前该锁没有被其他线程持有,则当前线程获取该锁并返回true,否则返回false。该方法不会引用当前线程阻塞。


public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

从源码里,可以看出,调用了sync的nonfairTryAcquire,所以tryLock()采用的是非公*策略。


2.3 boolean tryLock(long timeout, TimeUnit unit)

尝试获取锁,并设置超时时间,如果超时时间到了没有获取到锁,就返回false。


public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
// 调用AQS的tryAcquireNanos
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}

// AQS的doAcquireNanos方法
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 获取超时的绝对时间
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 计算剩余时间
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
// spinForTimeoutThreshold是静态常量,1000
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
// 挂起nanosTimeout后返回
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

3 释放锁
3.1 unlock()

尝试释放锁,如果当前线程持有锁,则调用该方法让该线程AQS对state减1,如果减1后,当前state为0,则当前线程会释放该锁,否则仅仅是减1。如果当前线程没有持有该锁,而调用了该方法,则会抛出IllegalMonitorStateException异常。


public void unlock() {
// 调用AQS的release
sync.release(1);
}

// AQS的release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

// Sync重写的tryRelease releases为1
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 如果不是锁持有者调用unlock,则抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果当前可重入次数为0,则清空锁持有线程
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 设置可重入次数为原始值-1
setState(c);
return free;
}


热文推荐
猜你喜欢
友情链接: