Java并发编程实战:第14章 构建自定义的同步工具
1. 管理状态依赖性
1.1 管理状态依赖性
- 状态依赖性指某种操作必须依赖于指定的状态才可以执行。比如一个阻塞队列的
take
方法依赖于这个阻塞队列中有至少一个元素这个状态。 - 如果一个状态依赖性操作所依赖的状态不满足,通常有几种处理办法:
- 抛出异常
- 使用某种约定的错误返回值
- 阻塞,直到依赖的状态被满足
状态依赖的可阻塞行为结构
1 | void blockingAction() throws InterruptedException { |
生产者-消费者的设计经常会使用 ArrayBlockingQueue 这种有限缓存。一个有限缓存提供的 put 和 take 操作,每一个都有先验条件:不能从空缓存中获取元素,也不能把元素置入已满的緩存中。如果依赖于状态的操作在处理先验条件时失败,可以抛出异常或者返回错误状态(把问题留给调用者),也可以保持阻塞直到对象转入正确的状态。
示例:有限缓存的实现
1 | import net.jcip.annotations.GuardedBy; |
1.2 将先验条件失败传给调用者
如果有缓存不满足先验条件,会停滞不前
1 | import net.jcip.annotations.ThreadSafe; |
如果向 GrumpyBoundedBuffer 中添加、拿取元素,GrumpyBoundedBuffer 已经满了、没有元素,那么将抛出 RuntimeException。
调用 GrumpyBoundedBuffer
1 | public class GrumpyBoundedBufferDemo { |
如果 GrumpyBoundedBuffer 中没有元素,那么 sleep,循环等待有元素可用。
1.3 利用 “轮询加休眠” 实现拙劣的阻塞
有限缓存使用了拙劣的阻塞
1 | import net.jcip.annotations.ThreadSafe; |
SleepyBoundedBuffer 与 GrumpyBoundedBuffer 不同的是,在 put 和 take 操作时,使用轮询+休眠的方式,直到满足条件。
像大多数行为良好的阻塞库的方法一样,SleepyBoundedBuffer 通过中断支持取消,如果被中断,它会提前返回,并抛出 InterruptedException。
1.4 使用条件队列解决拙劣的阻塞
条件队列可以让一组线程(等特集),以某种方式等待相关条件变成真。不同于传统的队列,它们的元素是数据项:条件队列的元素是等待相关条件的线程。
有限缓存使用条件队列
1 | import net.jcip.annotations.ThreadSafe; |
使用了超类 Object 中的 wait()、notify()、notifyAll()方法:
1 | // 使当前线程等待,直到另一个线程为此对象调用notify()方法或notifyAll()方法。 |
2. 使用条件队列
条件队列让构建有效且响应的状态依赖类变得更容易,但是使用时比较容易出错。应该尽量构建在像 LinkedBlockingQueue、Latch、Semaphore 和 FutureTask 基础之上。
2.1 条件谓词
条件调词是先验条件的第一站,它在一个操作与状态之间建立起依赖关系。
在有限缓存中,只有缓存不为空时 take 才能执行,否则它必须等待。就 take 而言,它的条件谓词是“绶存不空”,take 执行前必须先测试。类似地,put 的条件谓词是“缓存不满”。
在条件等待中存在一种重要的三元关系,包括:加锁、wait() 方法、条件谓词
每次调用 wait 都会您式地与特定的永什调调相关联。当调用特定条件谓词的 wait 时,调用者必须已经持有了与条件队列相关的锁,这个锁必须同时还保护着组成条件谓词的状态变量。
2.2 过早地唤醒
当线程A调用到 wait() 代码后,其它线程在此期间插足了 —- 获取了锁,并且修改了数据。那么线程A获取锁后,需要重新检查条件谓词。例如:
1 | public synchronized void put(V v) throws InterruptedException { |
【线程A】调用空缓存中的 take(),不满足条件(绶存不空),于是调用 wait() 使当前线程等待。
【线程B】调用该空缓存中的 put(),满足条件(缓存不满),于是将一个元素放入缓存,并且调用 notifyAll(),唤醒【线程A】。
【线程A】此时需要再次检查条件谓词是否满足。
不过有的时候,我们可能根本不知道哪个方法调用了 notify() 或 notifyAll(),唤醒正在 wait() 的线程。如果被唤醒的线程去检查条件谓词,还不能满足条件,就又要继续等待。
状态依赖方法的规范式
1 | void stateDependentMethod() throws InterruptedException { |
当使用条件等待时(Object.wait 或者 Condition.await)
- 永远设置一个条件谓词 —- 一些对象状态的测试,线程执行前必须满足它;
- 永远在调用 wait 前测试条件谓词,并且从 wait 中返回后再次测试;
- 永远在循环中调用 wait;
- 确保构成条件谓词的状态变量被锁保护,而这个锁正是与条件队列相关联的;
- 当调用 wait、 notify 或者 notifyAll 时,要持有与条件队列相关联的锁;并且在检查条件谓词之后、开始执行被保护的逻辑之前,不要释放锁。
2.3 丢失的信号
保证 notify()
一定在 wait()
之后。
2.4 通知
在有限缓存中,在绶存变为非空时,为了能够让 take 解除阻塞,我们必须确保每一条能够让缓存变为非空的代码路径都执行一个通知。
例如:
1
2
3
4
5
6
7
8
9
10public synchronized void alternatePut(V v) throws InterruptedException {
while (isFull()) {
wait();
}
boolean wasEmpty = isEmpty();
doPut(v);
if (wasEmpty) {
notifyAll();
}
}向有限缓存中添加一个元素,在最后调用 notifyAll() 通知正在 wait() 的线程。
对于 notify() 和 notifyAll() 无论调用哪一个,都必须持有与条件队列对象相关联的锁。
- 调用 notify() 的结果是:JVM 会从在这个条件队列中等待的众多线程中挑选出一个,并把它唤醒。
- 而调用 notifyAll() 会唤醒所有正在这个条件队列中等待的线程。
- 由于调用 notify() 和 notifyAll() 时必须持有条件队列对象的锁,这导致等待线程此时不能重新获得锁,无法从 wait() 返回,因此该通知线程应该尽快释放锁,以确保等待线程尽可能快地解除阻塞。
推荐使用 notifyAll(),因为多个线程可能会由于不同的原因在同一个条件队列中等待,调用 notifyAll() 会唤醒所有,而调用 notify() 只会由 JVM 挑选一个唤醒。
2.5 阀门类
开始阀门闭锁(starting gate latch)
二元闭锁:
1 | import java.util.concurrent.CountDownLatch; |
这里通过将计数器初始化为1,创建了一个二元闭锁:它只有两种状态,初始状态和终止状态。闭锁会阻止线程通过开始阀门,直到阀门被打开,此时所有的线程都可以通过。
使用 wait 和 notifyAll 实现可重关闭的阀门
1 | import net.jcip.annotations.GuardedBy; |
2.6 显式的 Condition 对象
Lock 是一个广义的内置锁,而 Condition 也是一种广义的 内置条件队列。
1 | package java.util.concurrent.locks; |
- 内部条件队列有一些缺陷。每个内部锁只能有一个与之相关联的条件队列,多个线程可能为了不同的条件谓词在同一个条件队列中等待,而且大多数常见的锁模式都会暴露条件队列对象。这些因素都导致不可能为了使用notifyAll(),而强迫等待线程统一。
- 如果想编写一个带有多个条件谓词的并发对象,或者想获得除了条件队列可见性之外的更多的控制权,可以使用Lock和Condition,而不是内置锁和内置条件队列。
- 一个 Conaition 和一个单独的 Lock 相关联,就像条件队列和单独的内部锁相关联一样;调用与 Condition 相关联的 Lock 的 Lock.newConaition() 方法,可以创建一个 Conaition。如同 Lock 提供了比内部加锁要丰富得多的特征集一样,Condition 也提供了比内部条件队列要丰富得多的特征集:每个锁可以有多个等待集、可中断/不可中断的条件等待、基于时限的等待以及公平/非公平队列之间的选择。
- 不同于内部条件队列,你可以让每个 Lock 都有任意数量的 Condition 对象。
警告: Object 的 wait()、notify() 和notifyAll()方法在 Conaition 中的对等体是 await()、 signal() 和 BignalAll()。
但是 Condition 继承自 Object,这意味着它也有 wait() 和 notify() 方法。
一定要确保使用了正确的版本 —- await() 和 signal()
有限缓存使用显式的条件变量:
1 | import net.jcip.annotations.GuardedBy; |
Condition 简化了使用单一通知的条件。使用更有效的 signal(),而不是 signalAll(),这就会减少相当数量的上下文转换,而且每次缓存操作都会触发对锁的请求。
3. 剖析 Synchronizer
ReentrantLock 和 Semaphore 有很多共同点。这些类都扮演了“阀门”的角色,每次只允许有限数目的线程通过它。
线程到达阀门后
- 可以允许通过:ReentrantLock.lock()、Semaphore.acquire()
- 可以等待:lock()方法 或 acquire() 方法阻塞
- 可以取消:ReentrantLock.tryLock()、Semaphore.tayAcquire() 返回 false,指明在允许的时间内,【锁】或者【许可】不可用
它们都允许可中断的、不可中断的、可限时的请求尝试,它们也都允许选择公平、非公平的等待线程队列。
其实,ReentrantLock 和 Semaphore 都用到了同一个共同的基类:AbstractQueuedSynchronizer
AbstractQueuedSynchronizer(AQS) 和其它很多的 Synchronizer 一样,AQS 是一个用来构建锁和 Synchronizer 的框架。
CountDownLatch、ReentrantReadWriteLock 和 FutureTask 都是基于 AQS 构建的。
使用 Lock 实现的计数信号量
1 | import net.jcip.annotations.GuardedBy; |
在 SemaphoreonLock 中,请求许可的操作在两个地方可能会阻塞
- 信号量的状态正在被锁保护着
- 许可不可用
AQS 解决了实现一个 Synchronizer 的大量细节,比如等待线程的 FIFO(First Input, First Output) 队列。单独的 Synchronizer 可以定义一个灵活的标准,用来描述线程是否应该允许通过,还是需要等待。
使用 AQS 构建的 Synchronizer 只可能在一个点上发生阻塞,这样降低了上下文切换的开销,并提高了吞吐量。
4. AbstractQueuedSynchronizer
4.1 AbstractQueuedSynchronizer
一个基于 AQS 的 synchronizer 所执行的基本操作,是一些不同形式的获取(acquire) 和释放(release)。
AQS 管理着一个广域状态信息的单一整数,状态信息可以通过 getState()、setState()、compareAndSetState()等方法进行操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14// 返回同步状态的当前值。此操作具有volatile读取的内存语义
protected final int getState() {
return state;
}
// 设置同步状态的值。此操作具有volatile写入的内存语义
protected final void setState(int newState) {
state = newState;
}
// 如果当前状态值等于预期值,则自动将同步状态设置为给定的更新值。此操作具有volatile读写的内存语义
protected final boolean compareAndSetState(int expect, int update) {
// ...
}- ReentrantLock 用它来表现拥有它的线程已经请求了多少次锁
- Semaphore 用它来表现剩余的许可数
- Futurerask 用它来表现任务的状态(尚未开始、运行、完成和取消)
AQS中获取和释放操作的规范式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20boolean acquire() throws InterruptedException {
while (状态不允许获取) {
if (阻止获取请求) {
如果尚未排队阻塞当前线程,
} else {
否则则将当前线程入队
return failure
}
}
可能更新同步状态
如果线程已排队,则使线程出队
return success
}
void release() {
更新同步状态
if (新状态可能允许被阻塞的线程获取) {
解除阻塞一个或多个排队线程
}
}一个获取操作分为两步。第一步,Synchronizer 判断当前状态是否允许被获得:如果是,就让线程执行,如果不是,获取操作阻塞或失败。第二步包括了可能需要更新的状态;一个想获取 synchronizer 的线程会影响到其他线程是否能够获取它。
一个简单的闭锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28import net.jcip.annotations.ThreadSafe;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class OneShotLatch {
private final Sync sync = new Sync();
public void signal() {
sync.releaseShared(0);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(0);
}
private class Sync extends AbstractQueuedSynchronizer {
protected int tryAcquireShared(int ignored) {
// 如果闭锁打开(state == 1)则成功,否则失败
return (getState() == 1) ? 1 : -1;
}
protected boolean tryReleaseShared(int ignored) {
setState(1); // 闭锁现已打开
return true; // 其他线程现在可能能够获取
}
}
}
4.2 ReentrantLock
ReentrantLock 只支持独占的获取操作,因此它实现了 tryAcquire()、 tryRelease() 和 isHeldExclusively()
1 | public class ReentrantLock implements Lock, java.io.Serializable { |
非公平的 ReentrantLock 中 tryAcquire 的实现
1 | final boolean nonfairTryAcquire(int acquires) { |
4.3 Semaphore 和 CountDownLatch
Semaphore
Semaphore 使用 AQS 类型的同步状态持有当前可用许可的数量。Semaphore 的 tryAcquireShared 和 tryAcquire Shared 方法:
1 | public class Semaphore implements java.io.Serializable { |
- nonfairTryAcquireShared() 方法:首先计算剩余许可的数量,如果没有足够的许可,会返回一个值,表明获取操作失败。如果还有充足的许可剩余,nonfairTryAcquireShared 会使用 compareAndsetState(),尝试原子地递减许可的计数。
- 类似地,tryReleaseShared() 会递增许可计数,这会潜在地解除等待中的线程的阻塞,不断地重试直到成功地更新。tryReleaseShared 的返回值表明,释放操作是否可以解除其他线程的阻塞。
CountDownLatch
CountDownLatch 使用 AQS 的方式与 Semaphore 相似,同步状态持有当前的计数:
countDown 方法调用 release,后者会导致计数器递减,并且在计数器已经到达零的时候,解除所有等待线程的阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13protected boolean tryReleaseShared(int releases) {
// 递减计数;转换为零时的信号
for (;;) {
// 同步状态的当前值
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
// 如果当前状态值等于预期值,则自动将同步状态设置为给定的更新值
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}await 调用 acquire,如果计数器己经到达零,acquire 会立即返回,否则它会被阻塞。
1
2
3protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
4.4 ReentrantReadWriteLock
ReentrantReadWriteLock
1 | public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { |
ReadwriteLock 的接口要求了两个锁,一个读者锁和一个写者锁。在基于 AQS 的 ReentrantReadMriteLock 实现中,一个单独的 AQS 子类管理了读和写的加锁。
ReentrantReadwriteLock 使用一个 16 位的状态为写锁(write-lock)计数,使用另一个 16 位的状态为读锁(read-lock)计数。
对读锁的操作使用共享的获取与释放的方法:对写锁的操作使用独占的获取与释放的方法。
Java并发编程实战:第14章 构建自定义的同步工具