Java并发编程实战:第14章 构建自定义的同步工具

1. 管理状态依赖性

1.1 管理状态依赖性

  • 状态依赖性指某种操作必须依赖于指定的状态才可以执行。比如一个阻塞队列的take方法依赖于这个阻塞队列中有至少一个元素这个状态。
  • 如果一个状态依赖性操作所依赖的状态不满足,通常有几种处理办法:
    1. 抛出异常
    2. 使用某种约定的错误返回值
    3. 阻塞,直到依赖的状态被满足

状态依赖的可阻塞行为结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void blockingAction() throws InterruptedException {
// ...
// 请求 依赖的状态 锁
// ...

// 依赖条件不成立
while (不成立的条件) {
// 释放锁

// ...
// 直到依赖条件成立。如果中断或超时过期,则可选失败
// ...

// 请求锁
}

// do something
// 释放锁
}

生产者-消费者的设计经常会使用 ArrayBlockingQueue 这种有限缓存。一个有限缓存提供的 put 和 take 操作,每一个都有先验条件:不能从空缓存中获取元素,也不能把元素置入已满的緩存中。如果依赖于状态的操作在处理先验条件时失败,可以抛出异常或者返回错误状态(把问题留给调用者),也可以保持阻塞直到对象转入正确的状态。

示例:有限缓存的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;

@ThreadSafe
public abstract class BaseBoundedBuffer<V> {

@GuardedBy("this")
private final V[] buf;

@GuardedBy("this")
private int tail;

@GuardedBy("this")
private int head;

@GuardedBy("this")
private int count;

protected BaseBoundedBuffer(int capacity) {
this.buf = (V[]) new Object[capacity];
}

protected synchronized final void doPut(V v) {
buf[tail] = v;
if (++tail == buf.length) {
tail = 0;
}
++count;
}

protected synchronized final V doTake() {
V v = buf[head];
buf[head] = null;
if (++head == buf.length) {
head = 0;
}
--count;
return v;
}

public synchronized final boolean isFull() {
return count == buf.length;
}

public synchronized final boolean isEmpty() {
return count == 0;
}
}

1.2 将先验条件失败传给调用者

如果有缓存不满足先验条件,会停滞不前

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import net.jcip.annotations.ThreadSafe;

@ThreadSafe
public class GrumpyBoundedBuffer <V> extends BaseBoundedBuffer<V> {

public GrumpyBoundedBuffer() {
this(100);
}

public GrumpyBoundedBuffer(int size) {
super(size);
}

public synchronized void put(V v) throws BufferFullException {
if (isFull()) {
throw new BufferFullException();
}
doPut(v);
}

public synchronized V take() throws BufferEmptyException {
if (isEmpty()) {
throw new BufferEmptyException();
}
return doTake();
}
}

class BufferFullException extends RuntimeException {
}

class BufferEmptyException extends RuntimeException {
}

如果向 GrumpyBoundedBuffer 中添加、拿取元素,GrumpyBoundedBuffer 已经满了、没有元素,那么将抛出 RuntimeException。

调用 GrumpyBoundedBuffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class GrumpyBoundedBufferDemo {
private GrumpyBoundedBuffer<String> buffer;
int SLEEP_GRANULARITY = 50;

public void useBuffer() throws InterruptedException {
while (true) {
try {
String item = buffer.take();
// ... ...
break;
} catch (BufferEmptyException e) {
Thread.sleep(SLEEP_GRANULARITY);
// 建议使用:Thread.yield() 向调度程序提示当前线程愿意放弃其当前对处理器的使用。而不占用CPU
}
}
}
}

如果 GrumpyBoundedBuffer 中没有元素,那么 sleep,循环等待有元素可用。

1.3 利用 “轮询加休眠” 实现拙劣的阻塞

有限缓存使用了拙劣的阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import net.jcip.annotations.ThreadSafe;

@ThreadSafe
public class SleepyBoundedBuffer <V> extends BaseBoundedBuffer<V> {
int SLEEP_GRANULARITY = 60;

public SleepyBoundedBuffer() {
this(100);
}

public SleepyBoundedBuffer(int size) {
super(size);
}

public void put(V v) throws InterruptedException {
while (true) {
synchronized (this) {
if (!isFull()) {
doPut(v);
return;
}
}
Thread.sleep(SLEEP_GRANULARITY);
}
}

public V take() throws InterruptedException {
while (true) {
synchronized (this) {
if (!isEmpty()) {
return doTake();
}
}
Thread.sleep(SLEEP_GRANULARITY);
}
}
}

SleepyBoundedBuffer 与 GrumpyBoundedBuffer 不同的是,在 put 和 take 操作时,使用轮询+休眠的方式,直到满足条件。

像大多数行为良好的阻塞库的方法一样,SleepyBoundedBuffer 通过中断支持取消,如果被中断,它会提前返回,并抛出 InterruptedException。

1.4 使用条件队列解决拙劣的阻塞

条件队列可以让一组线程(等特集),以某种方式等待相关条件变成真。不同于传统的队列,它们的元素是数据项:条件队列的元素是等待相关条件的线程。

有限缓存使用条件队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import net.jcip.annotations.ThreadSafe;

@ThreadSafe
public class BoundedBuffer<V> extends BaseBoundedBuffer<V> {
public BoundedBuffer() {
this(100);
}

public BoundedBuffer(int size) {
super(size);
}

public synchronized void put(V v) throws InterruptedException {
while (isFull()) {
wait();
}
doPut(v);
notifyAll();
}

public synchronized V take() throws InterruptedException {
while (isEmpty()) {
wait();
}
V v = doTake();
notifyAll();
return v;
}

public synchronized void alternatePut(V v) throws InterruptedException {
while (isFull()) {
wait();
}
boolean wasEmpty = isEmpty();
doPut(v);
if (wasEmpty) {
notifyAll();
}
}
}

使用了超类 Object 中的 wait()、notify()、notifyAll()方法:

1
2
3
4
5
6
7
8
// 使当前线程等待,直到另一个线程为此对象调用notify()方法或notifyAll()方法。
public final void wait() throws InterruptedException { }

// 唤醒正在此对象的监视器上等待的单个线程。
public final native void notify();

// 唤醒正在此对象的监视器上等待的所有线程。
public final native void notifyAll();

2. 使用条件队列

条件队列让构建有效且响应的状态依赖类变得更容易,但是使用时比较容易出错。应该尽量构建在像 LinkedBlockingQueue、Latch、Semaphore 和 FutureTask 基础之上。

2.1 条件谓词

  • 条件调词是先验条件的第一站,它在一个操作与状态之间建立起依赖关系。

  • 在有限缓存中,只有缓存不为空时 take 才能执行,否则它必须等待。就 take 而言,它的条件谓词是“绶存不空”,take 执行前必须先测试。类似地,put 的条件谓词是“缓存不满”。

  • 在条件等待中存在一种重要的三元关系,包括:加锁、wait() 方法、条件谓词

每次调用 wait 都会您式地与特定的永什调调相关联。当调用特定条件谓词的 wait 时,调用者必须已经持有了与条件队列相关的锁,这个锁必须同时还保护着组成条件谓词的状态变量。

2.2 过早地唤醒

当线程A调用到 wait() 代码后,其它线程在此期间插足了 —- 获取了锁,并且修改了数据。那么线程A获取锁后,需要重新检查条件谓词。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public synchronized void put(V v) throws InterruptedException {
// 条件谓词:isFull()
while (isFull()) {
wait();
}
doPut(v);
notifyAll();
}

public synchronized V take() throws InterruptedException {
// 条件谓词:isEmpty()
while (isEmpty()) {
wait();
}
V v = doTake();
notifyAll();
return v;
}

【线程A】调用空缓存中的 take(),不满足条件(绶存不空),于是调用 wait() 使当前线程等待。

【线程B】调用该空缓存中的 put(),满足条件(缓存不满),于是将一个元素放入缓存,并且调用 notifyAll(),唤醒【线程A】。

【线程A】此时需要再次检查条件谓词是否满足。

不过有的时候,我们可能根本不知道哪个方法调用了 notify() 或 notifyAll(),唤醒正在 wait() 的线程。如果被唤醒的线程去检查条件谓词,还不能满足条件,就又要继续等待。

状态依赖方法的规范式

1
2
3
4
5
6
7
8
9
void stateDependentMethod() throws InterruptedException {
// 条件谓词必须被锁守护
synchronized(lock) {
while (!conditionPredicate()) {
lock.wait();
}
// 现在,对象处于期望的状态中
}
}

当使用条件等待时(Object.wait 或者 Condition.await)

  • 永远设置一个条件谓词 —- 一些对象状态的测试,线程执行前必须满足它;
  • 永远在调用 wait 前测试条件谓词,并且从 wait 中返回后再次测试;
  • 永远在循环中调用 wait;
  • 确保构成条件谓词的状态变量被锁保护,而这个锁正是与条件队列相关联的;
  • 当调用 wait、 notify 或者 notifyAll 时,要持有与条件队列相关联的锁;并且在检查条件谓词之后、开始执行被保护的逻辑之前,不要释放锁。

2.3 丢失的信号

保证 notify() 一定在 wait() 之后。

2.4 通知

  • 在有限缓存中,在绶存变为非空时,为了能够让 take 解除阻塞,我们必须确保每一条能够让缓存变为非空的代码路径都执行一个通知。

    例如:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public synchronized void alternatePut(V v) throws InterruptedException {
    while (isFull()) {
    wait();
    }
    boolean wasEmpty = isEmpty();
    doPut(v);
    if (wasEmpty) {
    notifyAll();
    }
    }

    向有限缓存中添加一个元素,在最后调用 notifyAll() 通知正在 wait() 的线程。

  • 对于 notify() 和 notifyAll() 无论调用哪一个,都必须持有与条件队列对象相关联的锁。

    • 调用 notify() 的结果是:JVM 会从在这个条件队列中等待的众多线程中挑选出一个,并把它唤醒。
    • 而调用 notifyAll() 会唤醒所有正在这个条件队列中等待的线程。
    • 由于调用 notify() 和 notifyAll() 时必须持有条件队列对象的锁,这导致等待线程此时不能重新获得锁,无法从 wait() 返回,因此该通知线程应该尽快释放锁,以确保等待线程尽可能快地解除阻塞。
  • 推荐使用 notifyAll(),因为多个线程可能会由于不同的原因在同一个条件队列中等待,调用 notifyAll() 会唤醒所有,而调用 notify() 只会由 JVM 挑选一个唤醒。

2.5 阀门类

开始阀门闭锁(starting gate latch)

二元闭锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import java.util.concurrent.CountDownLatch;

/**
* 在时序测试中,使用 CountDownLatch 来启动和停止线程
*
* CountDownLatch.class ---- 利用它可以实现类似计数器的功能。比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了。
*/
public class TestHarness {
/**
* 使用 CountDownLatch 来启动和停止线程
* @param nThreads 要启动的线程数
* @param task 线程任务
* @return 线程等待,释放时间
* @throws InterruptedException 当线程等待、休眠或以其他方式被占用,并且线程在活动之前或期间被中断时抛出
*/
public long timeTasks(int nThreads, final Runnable task)
throws InterruptedException {
// 在线程可以通过await()方法之前,计数值为1,需调用countDown()方法的次数为1次
final CountDownLatch startGate = new CountDownLatch(1);
// 在线程可以通过await()方法之前,计数值为 nThreads,需调用countDown()方法的次数为 nThreads 次
final CountDownLatch endGate = new CountDownLatch(nThreads);
// 通过await()方法需满足:锁计数器为0、线程被中断、或者超过指定的等待时间
// 每调用countDown()方法,计数器减1

for (int i = 0; i < nThreads; i++) {
Thread aThread = new Thread() {
@Override
public void run() {
try {
// 线程等待
startGate.await();
try {
// 启动线程
task.run();
} finally {
// 减少锁存器的计数,如果计数达到零,则释放所有等待线程
endGate.countDown();
}
} catch (InterruptedException ignored) {}
}
};
aThread.start();
}

long start = System.nanoTime();
// startGate 锁计数器(原本定义为1),现在调用 countDown() 方法,减去1
// 此时,startGate 的锁计数器为0,可以通过 await() 方法了
// 因此就可以到达 task.run()
startGate.countDown();
// endGate 锁计数器原本定义为nThreads,也就是线程个数。
// 程序未出现异常情况,需要调用 countDown() 方法 nThreads 次
// endGate.countDown() 方法在每个线程的 finally {...} 中
// 因此需要 nThreads 个线程都完成,才能使得 锁计数器=0,这个时候才能通过 endGate.await() 方法
endGate.await();
long end = System.nanoTime();
return end - start;
}
}

这里通过将计数器初始化为1,创建了一个二元闭锁:它只有两种状态,初始状态和终止状态。闭锁会阻止线程通过开始阀门,直到阀门被打开,此时所有的线程都可以通过。

使用 wait 和 notifyAll 实现可重关闭的阀门

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;

@ThreadSafe
public class ThreadGate {
// 条件谓词: opened-since(n) (isOpen || generation>n)
@GuardedBy("this")
private boolean isOpen;
@GuardedBy("this")
private int generation;

public synchronized void close() {
isOpen = false;
}

public synchronized void open() {
++generation;
isOpen = true;
notifyAll();
}

// 阻塞,直到: opened-since(generation on entry)
public synchronized void await() throws InterruptedException {
int arrivalGeneration = generation;
while (!isOpen && arrivalGeneration == generation) {
wait();
}
}
}

2.6 显式的 Condition 对象

Lock 是一个广义的内置锁,而 Condition 也是一种广义的 内置条件队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package java.util.concurrent.locks;

public interface Condition {
/** 使当前线程等待,直到它发出信号或被中断。 */
void await() throws InterruptedException;

/** 使当前线程一直等待,直到它发出信号或中断,或者指定的等待时间过去。 */
boolean await(long time, TimeUnit unit)throws InterruptedException;

/** 使当前线程一直等待,直到它发出信号或中断,或者指定的等待时间过去。 */
long awaitNanos(long nanosTimeout) throws InterruptedException;

/** 导致当前线程等待,直到它发出信号。 */
void awaitUninterruptibly();

/** 使当前线程等待,直到它发出信号或中断,或指定的截止日期过去。 */
boolean awaitUntil(Date deadline) throws InterruptedException;

/** 唤醒一个等待线程。*/
void signal();

/** 唤醒所有等待的线程。 */
void signalAll();
}
  • 内部条件队列有一些缺陷。每个内部锁只能有一个与之相关联的条件队列,多个线程可能为了不同的条件谓词在同一个条件队列中等待,而且大多数常见的锁模式都会暴露条件队列对象。这些因素都导致不可能为了使用notifyAll(),而强迫等待线程统一。
  • 如果想编写一个带有多个条件谓词的并发对象,或者想获得除了条件队列可见性之外的更多的控制权,可以使用Lock和Condition,而不是内置锁和内置条件队列。
  • 一个 Conaition 和一个单独的 Lock 相关联,就像条件队列和单独的内部锁相关联一样;调用与 Condition 相关联的 Lock 的 Lock.newConaition() 方法,可以创建一个 Conaition。如同 Lock 提供了比内部加锁要丰富得多的特征集一样,Condition 也提供了比内部条件队列要丰富得多的特征集:每个锁可以有多个等待集、可中断/不可中断的条件等待、基于时限的等待以及公平/非公平队列之间的选择。
  • 不同于内部条件队列,你可以让每个 Lock 都有任意数量的 Condition 对象。

警告: Object 的 wait()、notify() 和notifyAll()方法在 Conaition 中的对等体是 await()、 signal() 和 BignalAll()。

但是 Condition 继承自 Object,这意味着它也有 wait() 和 notify() 方法。

一定要确保使用了正确的版本 —- await() 和 signal()

有限缓存使用显式的条件变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

@ThreadSafe
public class ConditionBoundedBuffer <T> {
protected final Lock lock = new ReentrantLock();

// 条件谓词: notFull (count < items.length)
private final Condition notFull = lock.newCondition();
// 条件谓词: notEmpty (count > 0)
private final Condition notEmpty = lock.newCondition();

private static final int BUFFER_SIZE = 100;
@GuardedBy("lock")
private final T[] items = (T[]) new Object[BUFFER_SIZE];
@GuardedBy("lock")
private int tail, head, count;

// 阻塞,直到: notFull
public void put(T x) throws InterruptedException {
lock.lock();
try {
while (count == items.length) {
notFull.await();
}
items[tail] = x;
if (++tail == items.length) {
tail = 0;
}
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}

// 阻塞直到: notEmpty
public T take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await();
}
T x = items[head];
items[head] = null;
if (++head == items.length) {
head = 0;
}
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}

Condition 简化了使用单一通知的条件。使用更有效的 signal(),而不是 signalAll(),这就会减少相当数量的上下文转换,而且每次缓存操作都会触发对锁的请求。

3. 剖析 Synchronizer

  • ReentrantLock 和 Semaphore 有很多共同点。这些类都扮演了“阀门”的角色,每次只允许有限数目的线程通过它。

  • 线程到达阀门后

    • 可以允许通过:ReentrantLock.lock()、Semaphore.acquire()
    • 可以等待:lock()方法 或 acquire() 方法阻塞
    • 可以取消:ReentrantLock.tryLock()、Semaphore.tayAcquire() 返回 false,指明在允许的时间内,【锁】或者【许可】不可用
  • 它们都允许可中断的、不可中断的、可限时的请求尝试,它们也都允许选择公平、非公平的等待线程队列。

  • 其实,ReentrantLock 和 Semaphore 都用到了同一个共同的基类:AbstractQueuedSynchronizer

  • AbstractQueuedSynchronizer(AQS) 和其它很多的 Synchronizer 一样,AQS 是一个用来构建锁和 Synchronizer 的框架。

  • CountDownLatch、ReentrantReadWriteLock 和 FutureTask 都是基于 AQS 构建的。

使用 Lock 实现的计数信号量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

@ThreadSafe
public class SemaphoreOnLock {
private final Lock lock = new ReentrantLock();

// 条件谓词: permitsAvailable (permits > 0)
private final Condition permitsAvailable = lock.newCondition();
@GuardedBy("lock")
private int permits;

SemaphoreOnLock(int initialPermits) {
lock.lock();
try {
permits = initialPermits;
} finally {
lock.unlock();
}
}

// 阻塞,直到: permitsAvailable
public void acquire() throws InterruptedException {
lock.lock();
try {
while (permits <= 0) {
permitsAvailable.await();
}
--permits;
} finally {
lock.unlock();
}
}

public void release() {
lock.lock();
try {
++permits;
permitsAvailable.signal();
} finally {
lock.unlock();
}
}
}

在 SemaphoreonLock 中,请求许可的操作在两个地方可能会阻塞

  • 信号量的状态正在被锁保护着
  • 许可不可用

AQS 解决了实现一个 Synchronizer 的大量细节,比如等待线程的 FIFO(First Input, First Output) 队列。单独的 Synchronizer 可以定义一个灵活的标准,用来描述线程是否应该允许通过,还是需要等待。

使用 AQS 构建的 Synchronizer 只可能在一个点上发生阻塞,这样降低了上下文切换的开销,并提高了吞吐量。

4. AbstractQueuedSynchronizer

4.1 AbstractQueuedSynchronizer

  • 一个基于 AQS 的 synchronizer 所执行的基本操作,是一些不同形式的获取(acquire) 和释放(release)。

  • AQS 管理着一个广域状态信息的单一整数,状态信息可以通过 getState()、setState()、compareAndSetState()等方法进行操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // 返回同步状态的当前值。此操作具有volatile读取的内存语义
    protected final int getState() {
    return state;
    }

    // 设置同步状态的值。此操作具有volatile写入的内存语义
    protected final void setState(int newState) {
    state = newState;
    }

    // 如果当前状态值等于预期值,则自动将同步状态设置为给定的更新值。此操作具有volatile读写的内存语义
    protected final boolean compareAndSetState(int expect, int update) {
    // ...
    }
    • ReentrantLock 用它来表现拥有它的线程已经请求了多少次锁
    • Semaphore 用它来表现剩余的许可数
    • Futurerask 用它来表现任务的状态(尚未开始、运行、完成和取消)
  • AQS中获取和释放操作的规范式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    boolean acquire() throws InterruptedException {
    while (状态不允许获取) {
    if (阻止获取请求) {
    如果尚未排队阻塞当前线程,
    } else {
    否则则将当前线程入队
    return failure
    }
    }
    可能更新同步状态
    如果线程已排队,则使线程出队
    return success
    }

    void release() {
    更新同步状态
    if (新状态可能允许被阻塞的线程获取) {
    解除阻塞一个或多个排队线程
    }
    }

    一个获取操作分为两步。第一步,Synchronizer 判断当前状态是否允许被获得:如果是,就让线程执行,如果不是,获取操作阻塞或失败。第二步包括了可能需要更新的状态;一个想获取 synchronizer 的线程会影响到其他线程是否能够获取它。

  • 一个简单的闭锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    import net.jcip.annotations.ThreadSafe;

    import java.util.concurrent.locks.AbstractQueuedSynchronizer;

    @ThreadSafe
    public class OneShotLatch {
    private final Sync sync = new Sync();

    public void signal() {
    sync.releaseShared(0);
    }

    public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(0);
    }

    private class Sync extends AbstractQueuedSynchronizer {
    protected int tryAcquireShared(int ignored) {
    // 如果闭锁打开(state == 1)则成功,否则失败
    return (getState() == 1) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int ignored) {
    setState(1); // 闭锁现已打开
    return true; // 其他线程现在可能能够获取
    }
    }
    }

4.2 ReentrantLock

ReentrantLock 只支持独占的获取操作,因此它实现了 tryAcquire()、 tryRelease() 和 isHeldExclusively()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ReentrantLock implements Lock, java.io.Serializable {
abstract static class Sync extends AbstractQueuedSynchronizer {
// 执行不公平的 tryLock
final boolean nonfairTryAcquire(int acquires) {
// ......
}
// 尝试设置状态以反映独占模式下的发布
protected final boolean tryRelease(int releases) {
// ......
}
// 如果同步仅针对当前(调用)线程进行,则返回true
protected final boolean isHeldExclusively() {
// ......
}
}

static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
}

非公平的 ReentrantLock 中 tryAcquire 的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 同步状态的当前值
int c = getState();
if (c == 0) {
// 如果当前状态值等于预期值,则自动将同步状态设置为给定的更新值
if (compareAndSetState(0, acquires)) {
// 设置当前拥有独占访问权限的线程
setExclusiveOwnerThread(current);
return true;
}
}
// 返回最后由setExclusiveOwnerThread设置的线程,如果从未设置,则返回null
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 设置同步状态的值
setState(nextc);
return true;
}
return false;
}

4.3 Semaphore 和 CountDownLatch

Semaphore

Semaphore 使用 AQS 类型的同步状态持有当前可用许可的数量。Semaphore 的 tryAcquireShared 和 tryAcquire Shared 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class Semaphore implements java.io.Serializable {
abstract static class Sync extends AbstractQueuedSynchronizer {
// ......
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 同步状态的当前值
int available = getState();
int remaining = available - acquires;
// 剩余状态量小于零
if (remaining < 0 ||
// 如果当前状态值等于预期值,则自动将同步状态设置为给定的更新值
compareAndSetState(available, remaining))
return remaining;
}
}

protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 同步状态的当前值
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// 如果当前状态值等于预期值,则自动将同步状态设置为给定的更新值
if (compareAndSetState(current, next))
return true;
}
}
// ......
}

static final class NonfairSync extends Sync {
// ......
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
}
  • nonfairTryAcquireShared() 方法:首先计算剩余许可的数量,如果没有足够的许可,会返回一个值,表明获取操作失败。如果还有充足的许可剩余,nonfairTryAcquireShared 会使用 compareAndsetState(),尝试原子地递减许可的计数。
  • 类似地,tryReleaseShared() 会递增许可计数,这会潜在地解除等待中的线程的阻塞,不断地重试直到成功地更新。tryReleaseShared 的返回值表明,释放操作是否可以解除其他线程的阻塞。

CountDownLatch

CountDownLatch 使用 AQS 的方式与 Semaphore 相似,同步状态持有当前的计数:

  • countDown 方法调用 release,后者会导致计数器递减,并且在计数器已经到达零的时候,解除所有等待线程的阻塞

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    protected boolean tryReleaseShared(int releases) {
    // 递减计数;转换为零时的信号
    for (;;) {
    // 同步状态的当前值
    int c = getState();
    if (c == 0)
    return false;
    int nextc = c-1;
    // 如果当前状态值等于预期值,则自动将同步状态设置为给定的更新值
    if (compareAndSetState(c, nextc))
    return nextc == 0;
    }
    }
  • await 调用 acquire,如果计数器己经到达零,acquire 会立即返回,否则它会被阻塞。

    1
    2
    3
    protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
    }

4.4 ReentrantReadWriteLock

ReentrantReadWriteLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;

abstract static class Sync extends AbstractQueuedSynchronizer {
static final int SHARED_SHIFT = 16;

protected final boolean tryRelease(int releases) {
// ......
}
protected final boolean tryAcquire(int acquires) {
// ......
}
}
}

ReadwriteLock 的接口要求了两个锁,一个读者锁和一个写者锁。在基于 AQS 的 ReentrantReadMriteLock 实现中,一个单独的 AQS 子类管理了读和写的加锁。

ReentrantReadwriteLock 使用一个 16 位的状态为写锁(write-lock)计数,使用另一个 16 位的状态为读锁(read-lock)计数。

对读锁的操作使用共享的获取与释放的方法:对写锁的操作使用独占的获取与释放的方法。

Java并发编程实战:第14章 构建自定义的同步工具

https://osys.github.io/posts/d3e0.html

作者

Osys

发布于

2022年08月29日

更新于

2022年08月29日

许可协议

评论