1. 锁的劣势
volatile 变量
- 然而,volatile 变量与锁相比有一些局限性:尽管它们提供了相似的可见性保证,但是它们不能用于构建原子化的复合操作。
- 当一个变量依赖其他变量时,或者当变量的新值依赖于旧值时,是不能用 volatile 变量的。这些都限制了 volatile 变量的使用,因此它们不能用于实现可靠的通用工具,比如计数器,或互斥体(mutex)
例如 i++
看起来是原子操作(变量的新值依赖于旧值),事实上有三个独立操作:获取当前值、为该值加一、协会更新值。为了不丢失更新,整个的 读-写-改
| import net.jcip.annotations.GuardedBy; import net.jcip.annotations.ThreadSafe;
@ThreadSafe public final class Counter { @GuardedBy("this") private long value = 0;
public synchronized long getValue() { return value; }
public synchronized long increment() { if (value == Long.MAX_VALUE) { throw new IllegalStateException("counter overflow"); } return ++value; } }
- 加锁还有其它的缺点。当一个线程正在等待锁时,它不能做任何其它事情。
- 如果一个线程在持有锁的情况下发生了延迟(如:页错误、调度延迟),那么其它所有需要改锁的线程都不能前进了。
- 如果阻塞的线程是优先级很高的线程,持有锁的线程优先级较低,那么就导致了优先级倒置(priority inversion)
2. 硬件对并发的支持
- 乐观锁:乐观锁在操作数据时非常乐观,认为多个线程不会同时修改数据。因此乐观锁不会上锁,只是在执行更新的时候判断一下在此期间别人是否修改了数据:如果别人修改了数据则放弃操作,否则执行操作。
- 悲观锁:悲观锁在操作数据时比较悲观,认为别人会同时修改数据。因此操作数据时直接把数据锁住,直到操作完成后才会释放锁;上锁期间其他人不能修改数据
- 独占锁(如ReentrantLock)模式下,每次只能有一个线程能持有锁。它是一项 悲观 保守的加锁策略。
- 对于细粒度的操作,可以使用乐观锁,在不发生干扰的情况下完成更新操作。
2.1 比较并交换(compare-and-swap)
- 当多个线程试图使用比较并交换(CAS),同时更新相同的变量时,其中一个线程会胜出,并且更新变量的值,而其它的线程都会失败。
- 失败的线程允许尝试再次比较并交换操作。一个线程在竞争 CAS 时失败,不会被阻塞,它可以决定是否重试,这样的灵活性大大减少了锁相关的活跃度风险。
| import net.jcip.annotations.GuardedBy; import net.jcip.annotations.ThreadSafe;
@ThreadSafe public class SimulatedCAS { @GuardedBy("this") private int value;
public synchronized int get() { return value; }
public synchronized int compareAndSwap(int expectedValue, int newValue) { int oldValue = value; if (oldValue == expectedValue) { value = newValue; } return oldValue; }
public synchronized boolean compareAndSet(int expectedValue, int newValue) { return (expectedValue == compareAndSwap(expectedValue, newValue)); } }
2.2 非阻塞计数器
- 当一个线程试图进入另一个线程正在执行的同步块或方法时会触发锁竞争。该线程会被强制进入等待状态,直到第一个线程执行完同步块并且已经释放了监视器。
- 当同一时间只有一个线程尝试执行同步的代码区域时,锁会保持非竞争的状态。
使用 CAS 实现的非阻塞计数器
| import net.jcip.annotations.ThreadSafe;
@ThreadSafe public class CasCounter { private SimulatedCAS value;
public int getValue() { return value.get(); }
public int increment() { int value0; int compareAndSwap; do { value0 = this.value.get(); compareAndSwap = this.value.compareAndSwap(value0, value0 + 1); } while (value0 != compareAndSwap); return value0 + 1; } }
Cascounter 利用 CAS 实现了线程安全的计数器。自增操作遊循了经典形式 —- 取得旧值,根据它计算出新值(加1),并使用 CAS 设定新值。如果比较并交换(CAS)失败,立即重试该操作。
对于普通的 Counter
| import net.jcip.annotations.GuardedBy; import net.jcip.annotations.ThreadSafe;
@ThreadSafe public final class Counter { @GuardedBy("this") private long value = 0;
public synchronized long getValue() { return value; }
public synchronized long increment() { return ++value; } }
- 如果自增,使用 value++ 的形式,那么需要对 increment() 加锁。
- 自增,使用比较并交换(CAS)的形式,自增失败后进行重试,不采用 increment() 加锁的形式。
- 比较并交换 compareAndSwap() 方法,加锁了。
- 基于 CAS 的计数器看起来比基于锁的计数器性能要差一些。CasCounter 有更多的操作和更复杂的控制流,表面看来还依赖于复杂的 CAS 操作。实质情况:
- CasCounter.increment() 方法没有使用锁,Counter.increment() 方法使用了锁。
- CasCounter.increment() 方法,使用了比较并交换(CAS),比较并交换 compareAndSwap() 方法是加锁的。
- CasCounter 到达锁的路径是 CasCounter -> increment() -> compareAndSwap()
- Counter 到达锁的路径是 Counter -> increment()
- 在多线程环境下,CasCounter 通过不断尝试以成功,使得对硬件设备的使用率更高;Counter 通过挂起线程,减轻了硬件设备的压力,在硬件设备允许的条件下,CasCounter 性能往往高于 Counter。
3. 原子变量类
原子变量类,提供了广义的 volatile 变量,以支持原子的、条件的读-写-改操作。
- 布尔类型原子类
- 整型原子类
- 长整型原子类
- 引用类型原子类
- 带有标记位的引用类型原子类
- 带有版本号的引用类型原子类
- 整形数组原子类
- 长整型数组原子类
- 引用类型数组原子类
- 整型字段的原子更新器
- 长整型字段的原子更新器
- 原子更新引用类型里的字段
3.1 原子变量是“更佳的volatile”
NumberRange 例子
| import java.util.concurrent.atomic.AtomicInteger;
public class NumberRange { private final AtomicInteger lower = new AtomicInteger(0); private final AtomicInteger upper = new AtomicInteger(0);
public void setLower(int i) { if (i > upper.get()) { throw new IllegalArgumentException("can't set lower to " + i + " > upper"); } lower.set(i); }
public void setUpper(int i) { if (i < lower.get()) { throw new IllegalArgumentException("can't set upper to " + i + " < lower"); } upper.set(i); }
public boolean isInRange(int i) { return (i >= lower.get() && i <= upper.get()); } }
不能使用原子化的整数来存储边界。因为一个不变约束限制了两个数值,并且它们不能在遵循约束的情况下被同时更新,如果一个值域类使用 volatile 引用,或者使用多个原子化的整数,将会造成不安全的“检查再运行”顺序。
使用 CAS 避免多元的不变约束
| import net.jcip.annotations.Immutable; import net.jcip.annotations.ThreadSafe;
import java.util.concurrent.atomic.AtomicReference;
@ThreadSafe public class CasNumberRange { @Immutable private static class IntPair { final int lower; final int upper;
public IntPair(int lower, int upper) { this.lower = lower; this.upper = upper; } }
private final AtomicReference<IntPair> values = new AtomicReference<IntPair>(new IntPair(0, 0));
public int getLower() { return values.get().lower; }
public int getUpper() { return values.get().upper; }
public void setLower(int i) { while (true) { IntPair oldValue = values.get(); if (i > oldValue.upper) { throw new IllegalArgumentException("Can't set lower to " + i + " > upper"); } IntPair newValue = new IntPair(i, oldValue.upper); if (values.compareAndSet(oldValue, newValue)) { return; } } }
public void setUpper(int i) { while (true) { IntPair oldValue = values.get(); if (i < oldValue.lower) { throw new IllegalArgumentException("Can't set upper to " + i + " < lower"); } IntPair newValue = new IntPair(oldValue.lower, i); if (values.compareAndSet(oldValue, newValue)) return; } } }
把 OnevalueCache 的技术结合到原子化引用中,通过原子化地更新持有上下边界的不变类的引用,来缩小竞争条件。CasNumberRange 使用了 AtomicReference 和 IntPair 来保持状态:通过使用 compareAndSet,它能够避开 NumberRange 的竞争条件,更新上下界。
3.2 性能比较:锁与原子变量
使用 ReentrantLock 实现随机数字生成器
| import net.jcip.annotations.ThreadSafe;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;
@ThreadSafe public class ReentrantLockPseudoRandom extends PseudoRandom { private final Lock lock = new ReentrantLock(false); private int seed;
ReentrantLockPseudoRandom(int seed) { this.seed = seed; }
public int nextInt(int n) { lock.lock(); try { int s = seed; seed = calculateNext(s); int remainder = s % n; return remainder > 0 ? remainder : remainder + n; } finally { lock.unlock(); } } }
使用 AtomicInteger 实现随机数字生成器
| import net.jcip.annotations.ThreadSafe;
import java.util.concurrent.atomic.AtomicInteger;
@ThreadSafe public class AtomicPseudoRandom extends PseudoRandom { private AtomicInteger seed;
AtomicPseudoRandom(int seed) { this.seed = new AtomicInteger(seed); }
public int nextInt(int n) { while (true) { int s = seed.get(); int nextSeed = calculateNext(s); if (seed.compareAndSet(s, nextSeed)) { int remainder = s % n; return remainder > 0 ? remainder : remainder + n; } } } }
锁通过挂起线程来影响竞争,将小了CPS的利用和功效内存总线上的同步通信量。原子变量吧竞争光里退回给调用类。与大多数基于 CAS 的算法相比,AtomicPseudoRandom 通过不断反复尝试来响应竞争,在激烈竞争环境下会带来更多的竞争。

图中为书作者研究 Lock 和 AtomicInteger 在激烈竞争下的性能比较。不存在任何一个程序,除了竞争锁或原子变量,其它什么都不做。在实践中,原子化的伸缩性比锁更好(硬件的影响对原子化伸缩性影响较大)。
4. 非阻塞算法
- 非阻塞(nonblocking)算法:一个线程的失败或挂起不应该影响其他线程的失败或挂起
- 锁自由(lock-frce)算法:如果算法的每一步骤中都有一些线程能够继续执行,那么这样的算法称为锁自由算法。
在线程间使用 CAS 进行协调,这样的算法如果能构建正确的话,它既是非阻塞的, 又是锁自由的。
4.1 非阻塞栈
- 实现同等功能前提下,非阻塞算法被认为比基于锁的算法更加复杂。创建非阻寨算法的前提是为维护数据的一致性,解决如何把原子化范围缩小到一个唯一变量。
- 非阻塞算法的特性:某项工作的完成具有不确定性,必须重新执行。
- 非阻塞算法中能确保线程安全性,因为 compareAndSet 像锁定机制一样,既能提供原子性,又能提供可见性。
使用 Treiber 算法 (Treiber, 1986) 的非阻塞栈
| import net.jcip.annotations.ThreadSafe;
import java.util.concurrent.atomic.AtomicReference;
@ThreadSafe public class ConcurrentStack<E> { AtomicReference<Node<E>> top = new AtomicReference<Node<E>>();
public void push(E item) { Node<E> newHead = new Node<E>(item); Node<E> oldHead; boolean cas = false; do { oldHead = top.get(); newHead.next = oldHead; cas = top.compareAndSet(oldHead, newHead); } while (!cas); }
public E pop() { Node<E> oldHead; Node<E> newHead; boolean cas = false; do { oldHead = top.get(); if (oldHead == null) { return null; } newHead = oldHead.next; cas = top.compareAndSet(oldHead, newHead); } while (!cas); return oldHead.item; }
private static class Node<E> { public final E item; public Node<E> next;
public Node(E item) { this.item = item; } } }
4.2 非阻塞链表
一个链表比栈更加复杂,因为它需要支持首尾的快速访问。为了实现,它会维护独立的队首指针和队尾指针。两个指针初始时都指向队列的末尾节点。在成功加入新元素时,两个节点都需要更新 —- 原子化更新。
Michael-Scott 非阻塞队列算法中的插入(Michael 与 Scott, 1996)
| import net.jcip.annotations.ThreadSafe;
import java.util.concurrent.atomic.AtomicReference;
@ThreadSafe public class LinkedQueue <E> {
private final Node<E> dummy = new Node<E>(null, null); private final AtomicReference<Node<E>> head = new AtomicReference<Node<E>>(dummy); private final AtomicReference<Node<E>> tail = new AtomicReference<Node<E>>(dummy);
public boolean putHead(E item) { Node<E> newHead = new Node<E>(item, null); while (true) { Node<E> curHead = head.get(); if (curHead == head.get()) { if (newHead.next.compareAndSet(null, curHead)) { return head.compareAndSet(curHead, newHead); } } } }
public boolean putTail(E item) { Node<E> newNode = new Node<E>(item, null); while (true) { Node<E> curTail = tail.get(); Node<E> tailNext = curTail.next.get(); if (curTail == tail.get()) { if (tailNext != null) { tail.compareAndSet(curTail, tailNext); } else if (curTail.next.compareAndSet(null, newNode)) { return tail.compareAndSet(curTail, newNode); } } } } private static class Node <E> { final E item; final AtomicReference<Node<E>> next;
public Node(E item, Node<E> next) { this.item = item; this.next = new AtomicReference<Node<E>>(next); } } }
4.3 原子化的域更新器
在 ConcurrentLinkedQueue 中使用原子化的域更新器
| private class Node<E> { private final E item; private volatile Node<E> next; public Node(E item) { this.item = item; } }
private static AtomicReferenceFieldUpdater nextUpdater = AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "next");
4.4 ABA 问题
- ABA 问題是因为在算法中误用比较并交换而引起的反常现象,节点被循环使用
- 解决:更新一对值,包括引用和版本号,而不是仅更新该值的引用。