Java并发编程实战:第5章 构建块

1. 同步容器

1.1 同步容器

  • 在Java的集合容器框架中,主要有四大类别:List、Set、Queue、Map。

  • 其中List、Set、Queue接口分别继承了Collection接口,Map本身是一个接口。

  • 同步容器主要包括2类

    1. Vector、Stack

      1
      2
      Vector实现了List接口,Vector实际上就是一个数组,和ArrayList类似,但是它进行了同步措施。
      Stack也是一个同步容器,它的方法也用synchronized进行了同步,它实际上是继承于Vector类。
    2. HashTable

      1
      HashTable实现了Map接口,它和HashMap很相似,但是HashTable进行了同步处理,而HashMap没有。

1.2 同步容器中出现的问题

同步容器都是线程安全的。但是对于复合操作,可能需要使用额外的客户端加锁进行保护。

通常对容器的复合操作包括:

  • 迭代
  • 导航(根据一定的顺序寻找下一个元素)
  • 条件运算(缺少即加入……)

操作 Vector 的复合操作可能导致混乱结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import java.util.Vector;

/**
* Created by osys on 2022/08/28 21:48.
*/
public class UnsafeVectorHelpers {
public static Object getLast(Vector list) {
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}

public static void deleteLast(Vector list) {
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}
}

对于 list.get()list.remove()list.size() 其本身是线程安全的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* 返回此 Vector 中指定位置的元素。
*
* @param index 要返回的元素的索引
* @return 指定索引处的对象
* @throws ArrayIndexOutOfBoundsException 如果索引超出范围
* ({@code index < 0 || index >= size()})
* @从 1.2 开始
*/
public synchronized E get(int index) {
if (index >= elementCount)
throw new ArrayIndexOutOfBoundsException(index);

return elementData(index);
}

/**
* 移除此 Vector 中指定位置的元素。
* 将任何后续元素向左移动(从它们的元素中减去一个
* 指数)。返回从 Vector 中删除的元素。
*
* @throws ArrayIndexOutOfBoundsException 如果索引超出范围
* ({@code index < 0 || index >= size()})
* @param index 要删除的元素的索引
* @return 被移除的元素
* @从 1.2 开始
*/
public synchronized E remove(int index) {
modCount++;
if (index >= elementCount)
throw new ArrayIndexOutOfBoundsException(index);
E oldValue = elementData(index);

int numMoved = elementCount - index - 1;
if (numMoved > 0)
System.arraycopy(elementData, index+1, elementData, index,
numMoved);
elementData[--elementCount] = null; // Let gc do its work

return oldValue;
}

/**
* 返回此向量中的组件数。
*
* @return 此向量中的组件数
*/
public synchronized int size() {
return elementCount;
}

但是对于 getLast()deleteLast() 则不然。

1
2
3
4
5
6
7
8
9
1. 假如两个线程分别调用 getLast() 和 deleteLast() 方法

2. 它们先后获取到的 lastIndex 都是 10

3. 调用 deleteLast() 的线程先按照 lastIndex=10 删除了 Vector 的最后一个元素

4. 另一个调用 getLast() 的线程此时按照 lastIndex=10 获取 Vector 的一个元素(元素已经被删除了)

5. 抛出异常:ArrayIndexOutOfBoundsException(index)

使用客户端加锁,对 Vector 进行复合操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.util.Vector;

/**
* Created by osys on 2022/08/28 21:48.
*/
public class SafeVectorHelpers {

private final Vector list;

public SafeVectorHelpers(Vector list) {
this.list = list;
}

public Object getLast() {
synchronized (list) {
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
}

public void deleteLast() {
synchronized (list) {
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}
}
}

迭代中可能抛出 ArrayIndexOutOfBoundsException

1
2
3
for (int i=0; i<vector.size(); i++) {
doSomething(vector.get(i));
}

某个线程在操作 vector 的同时,并不能阻止别的线程对其进行操作,假如在迭代的过程中,其它线程操作删除了 vector 里面的一个元素,那么就会抛出 ArrayIndexOutOfBoundsException

使用客户端加锁进行迭代:

1
2
3
4
5
synchronized (vector) {
for (int i=0; i<vector.size(); i++) {
doSomething(vector.get(i));
}
}

这样会削弱并发性。在迭代的时候,其它线程并不能访问 vector

1.3 迭代器和 ConcurrentModificationException

对 Collection 进行迭代的标准方式是使用 Iterator。在迭代过程中,对 Collection 进行了修改操作,会导致抛出 ConcurrentModificationException 异常。

1
2
3
4
5
List<Object> widgetList = Collections.synchronizedList(new ArrayList<>());
// 可能地出 ConcurrentModificationException
for (Object obj : widgetList) {
doSomething(obj);
}
  • 上面说到 客户端加锁进行迭代 会影响并发性能。
  • 在迭代期间,对容器加锁的一个替代办法是复制容器
  • 因为复制是线程限制的,没有其他的线程能够在迭代期间对其进行修改,这样消除了ConcurrentModificationException 发生的可能性。(**容器仍然需要在复制期间对自己加锁**)。
  • 复制容器会有明显的性能开销:这样做是好是坏取决于许多因素,包括容器的大小、每一个元素的工作量、迭代操作相对于容器其他操作的频率,以及响应性和吞吐量的
    需求。

1.4 隐藏迭代器

迭代器加锁复制容器可以防止迭代器抛出ConcurrentModificationException异常。所有对共享容器进行迭代的地方都需要此操作。

在某些情况下,迭代器是隐藏起来的。如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.util.HashSet;
import java.util.Random;
import java.util.Set;

/**
* Created by osys on 2022/08/28 21:48.
*/
public class HiddenIterator {

private final Set<Integer> set = new HashSet<>();

public synchronized void add(Integer i) {
set.add(i);
}

public synchronized void remove(Integer i) {
set.remove(i);
}

public void addTenThings() {
Random random = new Random();
for (int i=0; i < 10 ; i++) {
add(random.nextInt());
}
System.out.println("DEBUG: 添加了十个元素 " + set);
}
}

在如下代码中将执行迭代操作:

1
System.out.println("DEBUG: 添加了十个元素 " + set);

编译器将字符串的连接操作转换为StringBuilder.append(Object),而这个方法又会调用容器AbstractCollection.toString()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 返回此集合的字符串表示形式。字符串
* 表示由集合中的元素列表组成
* 它们由其迭代器返回的顺序,括在方括号中
* (<tt>"[]"</tt>)。相邻元素由字符分隔
* <tt>"、"</tt>(逗号和空格)。元素被转换为字符串
* 通过 {@link String#valueOf(Object)}。
*
* @return 此集合的字符串表示形式
*/
public String toString() {
Iterator<E> it = iterator();
if (! it.hasNext())
return "[]";

StringBuilder sb = new StringBuilder();
sb.append('[');
for (;;) {
E e = it.next();
sb.append(e == this ? "(this Collection)" : e);
if (! it.hasNext())
return sb.append(']').toString();
sb.append(',').append(' ');
}
}

addTenThings()方法可能会抛出ConcurrentModificationException,因为在生成调试消息的过程中,AbstractCollection.toString()对容器进行迭代。

2. 并发容器

2.1 并发容器

Java5.0开始针对多线程并发访问设计,提供了并发性能较好的并发容器,引入了java.util.concurrent包。主要解决了两个问题:

  1. 根据具体场景进行设计,尽量避免synchronized,提供并发性。
  2. 定义了一些并发安全的复合操作,并且保证并发环境下的迭代操作不会出错。

2.2 ConcurrentHashMap

  • ConcurrentHashMap为了提高本身的并发能力,在内部采用了一个叫做Segment的结构,一个Segment其实就是一个类HashTable的结构,Segment内部维护了一个链表数组
  • ConcurrentHashMap定位一个元素的过程需要进行两次Hash操作,第一次Hash定位到Segment,第二次Hash定位到**元素所在链表的头部**
  • 该结构的带来的副作用是Hash的过程要比普通的HashMap要长,但是带来的好处是写操作的时候可以只对元素所在的Segment进行加锁即可,不会影响到其他的Segment
  • ConcurrentHashMap让锁的粒度更精细一些,并发性能更好。

构造方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* initialCapacity --- 初始容量。给定指定的负载因子,实现执行内部大小调整以适应这么多元素。
* loadFactor -------– 用于建立初始表大小的负载因子(表密度)
* concurrencyLevel -– 估计的并发更新线程数。实现可以使用这个值作为大小提示。
* m –---------------- Map<K, V>
*/
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {}

public ConcurrentHashMap(int initialCapacity, float loadFactor) {}

public ConcurrentHashMap(Map<? extends K, ? extends V> m) {}

public ConcurrentHashMap() {}

2.3 Map附加的原子操作

因为 ConcurrentHashMap 不能够在独占访问中被加锁,我们不能使用客户端加锁来创建新的原子操作。

不过一些常见的复合操作,比如缺少即加入相等便移除相等便替换,都已被实现为原子操作。

并且这些操作已在 ConcurrentMap 接口中声明,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface ConcurrentMap<K,V> extends Map<K, V> {
// 只有当没有找到匹配K的值时才插入
V putIfAbsent (K key, V value);

// 只有当K与V匹配时才移除
boolean remove (K key, V value);

// 只有当K与 oldValue 匹配时才取代
boolean replace (K key, V oldvalue, V newValue);

// 只有当K匹配某值时才取代
V replace(K key, V newValue);
}

2.4 CopyOnWrite 容器

CopyOnWrite 通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。

优点:

  • CopyOnWrite容器 是一种读写分离的思想,不同的容器。
  • 这种读写分离的思想,对于并发的读更友好。

不足:

  • CopyOnWrite容器 写时复制机制,导致在进行写操作的时候,内存里会同时驻扎两个对象的内存,旧的对象和新写入的对象。
  • 如果这些对象占用的内存比较大,那么这个时候很有可能造成频繁的Yong GCFull GC
  • CopyOnWrite容器 只能保证数据的最终一致性,不能保证数据的实时一致性。

3. 阻塞队列和生产者-消费者模式

3.1 阻塞队列和生产者-消费者模式

阻塞队列

  • 阻寨队列(Blocking queue)提供了可阻塞的 put 和 take 方法
  • 如果 Queue 己经满了,put 方法会被阻塞,直到有空间可用
  • 如果 Queue是空的,那么 take 方法会被阻塞,直到有元素可用
  • Queue 的长度可以有限,也可以无限,无限的 Queue 永远不会充满,所以它的 put 方法永远不会阻塞

例如:A洗盘子,并把洗好的盘子放在盘子架上,B从架子上得到盘子,并把它烘干。

1
2
3
4
5
1. 在这个场景中,盘子架充当了阻寨队列
2. 如果架子上没有盘子,消费者会一直等待,直到有盘子需要烘干
3. 如果架子被放满了,生产者会停止清洗直到架子上拥有新空间

每一个工人只与盘子架产生互动。他们不需要知道究竟存在多少生产者和消费者,或者谁生产了某个给定工作条目。

3.2 常见的阻塞队列

1
2
3
4
5
6
7
8
9
public class ArrayBlockingQueue<E> {}				// 基于数组的有界阻塞队列

public class LinkedBlockingQueue<E> {} // 基于链表的有界阻塞队列

public class PriorityBlockingQueue<E> {} // 优先级排序的无界阻塞队列。元素出队列的顺序按照优先级排序

public class DelayQueue<E extends Delayed>{} // 基于优先级队列的无界阻塞队列。队列中的元素只有到达规定的延时才能从队列中取出。

public class SynchronousQueue<E> {} // 阻塞队列,其中每个插入操作都必须等待另一个线程的相应删除操作。

3.3 阻塞队列常见的方法

1
2
3
4
5
6
7
8
9
add(E e)		将元素e插入到队列末尾,成功返回 true;失败 抛出异常
remove() 移除队首元素,成功返回 true;失败 抛出异常

poll() 移除并获取队首元素,若成功 返回队首元素;否则 返回null
peek() 获取队首元素,若成功 返回队首元素;否则 返回null

take() 获取并移除队首元素,如果队列为空则阻塞直到队列中有元素
put() 向队尾添加元素,如果队列满则等待直到可以添加
offer(E e) 向队尾添加元素,成功返回true;失败返回false

3.4 连续的线程限制

一个线程约束的对象完全由单一线程所有,但是所有权可以通过安全的发布被转移

安全发布确保了对象状态对新的所有者是可见的,并且因为原始的所有者不会再触及它,这样使得对象完全受限于新线程中

对于可变对象,生产者-消费者设计和阻塞队列一起,为生产者和消费者之间移交对象所有权提供了连续的线程限制

桌面搜索应用程序中的生产者和消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
* 文件搜索应用程序中的生产者和消费者
*/
public class ProducerConsumer {

/**
* 文件检索工具类(生产者)
* 将给定的文件夹,获取里面的文件,添加到队列
* 将给定的文件,添加到队列
*/
static class FileCrawler implements Runnable {
/** 文件阻塞队列 */
private final BlockingQueue<File> fileQueue;
/** 文件过滤器 */
private final FileFilter fileFilter;
/** 文件 */
private final File root;

public FileCrawler(BlockingQueue<File> fileQueue, final FileFilter fileFilter, File root) {
this.fileQueue = fileQueue;
this.root = root;
this.fileFilter = new FileFilter() {
/**
* 测试指定的抽象路径名是否应包含在路径名列表中
* @param pathname 路径名
* @return true/false
*/
@Override
public boolean accept(File pathname) {
return pathname.isDirectory() || fileFilter.accept(pathname);
}
};
}

/**
* 文件是否已经加入到阻塞队列
* @param file 文件
* @return false 表示已经加入
*/
private boolean alreadyIndexed(File file) {
return false;
}

@Override
public void run() {
try {
crawl(root);
} catch (InterruptedException e) {
// 中断线程
Thread.currentThread().interrupt();
}
}

/**
* 文件检索(将文件添加到队列,文件夹忽略掉)
* @param root 要被检索的文件
*/
private void crawl(File root) throws InterruptedException {
// 获取一个抽象路径名数组(该路径对应可能是文件,也可能是文件夹)
File[] entries = root.listFiles(fileFilter);
if (entries != null) {
for (File entry : entries) {
if (entry.isDirectory()) { // 如果是路径,继续检索
crawl(entry);
} else if (!alreadyIndexed(entry)) { // 文件未加入队列。将文件加入阻塞队列
fileQueue.put(entry);
}
}
}
}
}

/**
* 消费者,消费文件
*/
static class Indexer implements Runnable {
/** 文件阻塞队列 */
private final BlockingQueue<File> queue;

public Indexer(BlockingQueue<File> queue) {
this.queue = queue;
}

@Override
public void run() {
try {
while (true) {
indexFile(queue.take());
}
} catch (InterruptedException e) {
// 中断线程
Thread.currentThread().interrupt();
}
}

public void indexFile(File file) {
// 消费文件
};
}


/** 队列大小 */
private static final int BOUND = 10;
/** Java 虚拟机可用的处理器数量 */
private static final int N_CONSUMERS = Runtime.getRuntime().availableProcessors();

/**
* 开始桌面搜索
* @param roots 文件数组
*/
public static void startIndexing(File[] roots) {
BlockingQueue<File> queue = new LinkedBlockingQueue<File>(BOUND);
FileFilter filter = new FileFilter() {
/**
* 测试指定的抽象路径名是否应包含在路径名列表中
* @param pathname 路径名
* @return true/false
*/
@Override
public boolean accept(File pathname) {
return true;
}
};

// 检索文件,添加到阻塞队列中
for (File root : roots) {
new Thread(new FileCrawler(queue, filter, root)).start();
}

// 消费者,消费文件
for (int i = 0; i < N_CONSUMERS; i++) {
new Thread(new Indexer(queue)).start();
}
}
}

3.5 双端队列

DequeBlockingDeque,它们分别扩展了 Queue 和 BlockingQueve。

Deque 是一个双端队列,允许高效地在头和尾分别进行插入和移除

实现它们的分别有 ArrayDequeLinkedB1ockingDeaue

3.6 窃取工作

  • 双端队列使它们自身与一种叫做窃取工作的模式相关联。
  • 消费者生产者设计中,所有的消费者只共享一个工作队列。
  • 窃取工作的设计中,每一个消费者都有一个自己的双端队列。
  • 如果一个消费者完成了自己双端队列中的全部工作,它可以偷取其他消费者的双端队列中的末尾任务。
  • 因为工作者线程并不会竞争一个共享的任务队列,所以窃取工作模式比传统的生产者-消费者设计有更佳的可伸缩性。
  • 大多数时候它们访问自己的双端队列,减少竞争。当一个工作者必须要访问另一个队列时,它会从尾部截取,而不是从头部,从而进一步降低对双端队列的争夺。

4. 阻塞和中断的方法

线程会因为几种原因被阻塞暂停

  • 等待IO操作结束
  • 等待获得一个锁
  • 等待从Thread.sleep中唤醒
  • 或者是等待另一个线程的计算结果

当一个线程阻塞时,他通常被挂起,并且设置成线程阻塞的某个状态(BLOCKED -- 阻塞WAITING -- 等待TIMED_WAITING -- 等待),等到外部事件的发生触发将线程置回(RUNNABLE)状态重新获得调度的机会。

恢复中断状态,避免掩盖中断:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import java.util.concurrent.BlockingQueue;

/**
* Created by osys on 2022/08/28 21:48.
*/
public class TaskRunnable implements Runnable {
BlockingQueue<Task> queue;

@Override
public void run() {
try {
processTask(queue.take());
} catch (InterruptedException e) {
// 中断线程
Thread.currentThread().interrupt();
}
}

void processTask(Task task) {
// 处理任务
}

interface Task {
}
}

5. Synchronizer

5.1 Synchronizer

Synchronizer 是一个对象,它根据本身的状态调节线程的控制流

Synchronizer对象包含:BlockingQueue(阻塞队列)Semaphore(信号量)Barrier(关卡)闭锁(Latch)

5.2 阻塞队列(BlockingQueue)

  • 阻塞队列在容器类中,不仅作为对象的容器,而且能够协调生产者线程和消费者线程之间的控制流
  • 因为 takeput 会保持阻止状态直到队列进入了期望的状态(队列元素不为空,队列还能存放元素)。

5.3 闭锁(Latch)

闭锁是一种 Synchronizer,它可以延迟线程的进度直到线程到达终止状态

闭锁工作起来就像大门:直到闭锁达到终点状态之前,门一直是关闭的,没有线程能够通过,在终点状态到来的吋候,门开了,允许所有线程都通过。

一但闭锁到达了终点状态,它就不能够再改变状态了,所以它会永远保持敞开状态。

闭锁可以用来确保特定活动直到其他的活动完成后才发生,比如:

  • 确保一个计算不会执行,直到它需要的资源被初始化。
  • 确保一个服务不会开始,直到它依赖的其他服务都已经开始。
  • 等待,直到活动的所有部分都为继续处理作好充分准备,比如在多玩家的游戏中的所有玩家是否都准备就绪。这样的闭锁会在所有玩家准备就绪时,达到终点状态。

FutureTask

  • Futurerask 可以作为闭锁。
  • FutureTask 实现了 Future 接口和 Runnable 接口,它等价于一个携带结果的 Runnable,并且有三个状态:等待运行完成
  • 完成包括所有计算以任意的方式结束,包括正常结束、取消和异常。一旦 FutureTask 进入完成状态,它会永远停止在这个状态上。

在时序测试中,使用 CountDownLatch 来启动和停止线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import java.util.concurrent.CountDownLatch;

/**
* 在时序测试中,使用 CountDownLatch 来启动和停止线程
*
* CountDownLatch.class ---- 利用它可以实现类似计数器的功能。比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了。
*/
public class TestHarness {
/**
* 使用 CountDownLatch 来启动和停止线程
* @param nThreads 要启动的线程数
* @param task 线程任务
* @return 线程等待,释放时间
* @throws InterruptedException 当线程等待、休眠或以其他方式被占用,并且线程在活动之前或期间被中断时抛出
*/
public long timeTasks(int nThreads, final Runnable task)
throws InterruptedException {
// 在线程可以通过await()方法之前,计数值为1,需调用countDown()方法的次数为1次
final CountDownLatch startGate = new CountDownLatch(1);
// 在线程可以通过await()方法之前,计数值为 nThreads,需调用countDown()方法的次数为 nThreads 次
final CountDownLatch endGate = new CountDownLatch(nThreads);
// 通过await()方法需满足:锁计数器为0、线程被中断、或者超过指定的等待时间
// 每调用countDown()方法,计数器减1

for (int i = 0; i < nThreads; i++) {
Thread aThread = new Thread() {
@Override
public void run() {
try {
// 线程等待
startGate.await();
try {
// 启动线程
task.run();
} finally {
// 减少锁存器的计数,如果计数达到零,则释放所有等待线程
endGate.countDown();
}
} catch (InterruptedException ignored) {}
}
};
aThread.start();
}

long start = System.nanoTime();
// startGate 锁计数器(原本定义为1),现在调用 countDown() 方法,减去1
// 此时,startGate 的锁计数器为0,可以通过 await() 方法了
// 因此就可以到达 task.run()
startGate.countDown();
// endGate 锁计数器原本定义为nThreads,也就是线程个数。
// 程序未出现异常情况,需要调用 countDown() 方法 nThreads 次
// endGate.countDown() 方法在每个线程的 finally {...} 中
// 因此需要 nThreads 个线程都完成,才能使得 锁计数器=0,这个时候才能通过 endGate.await() 方法
endGate.await();
long end = System.nanoTime();
return end - start;
}
}

使用 FutureTask 顶载稍后需要的數据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

/**
* 使用 FutureTask 顶载稍后需要的數据
*/
public class Preloader {
/** 加载产品信息 */
ProductInfo loadProductInfo() throws DataLoadException {
return null;
}

/** 创建一个FutureTask ,它将在运行时执行给定的Callable */
private final FutureTask<ProductInfo> future = new FutureTask<>(new Callable<ProductInfo>() {
@Override
public ProductInfo call() throws DataLoadException {
return loadProductInfo();
}
});

private final Thread thread = new Thread(future);

public void start() { thread.start(); }

public ProductInfo get()
throws DataLoadException, InterruptedException {
try {
// 为已完成的任务返回结果或抛出异常(Callable执行结果或异常)
return future.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof DataLoadException) {
throw (DataLoadException) cause;
} else {
throw LaunderThrowable.launderThrowable(cause);
}
}
}

interface ProductInfo {
}
}

/** 数据加载异常 */
class DataLoadException extends Exception { }

/** Throwable 强制转换为 RuntimeException */
class LaunderThrowable {

/**
* 将未经检查的 Throwable 抛出。
*
* 如果 throwable 是 RuntimeException 则返回 Throwable。
* 如果 throwable 是 Error 则抛出 Error。
* 否者抛出 IllegalStateException。
*/
public static RuntimeException launderThrowable(Throwable throwable) {
if (throwable instanceof RuntimeException) {
return (RuntimeException) throwable;
} else if (throwable instanceof Error) {
throw (Error) throwable;
} else {
throw new IllegalStateException("Not unchecked", throwable);
}
}
}

5.4 信号量(Semaphore)

  • 计数信号量 用来控制能够同时访问某特定资源的活动的数量,或者同时执行某一给定操作的数量。

  • 计数信号量可以用来实现资源池或者给定一个容器限定边界。

Semaphore的主要方法:

1
2
3
4
void acquire() {} 				// 从这个信号量获取一个许可,如果没有许可,线程将被阻塞或者线程被中断
void release() {} // 释放一个许可,将其返回给信号量
int availablePermits() {} // 返回此信号量中当前可用的许可数
boolean hasQueuedThreads() {} // 查询是否有线程正在等待获取
  • Semaphore中管理着一组虚拟的许可(permit),许可的初始数量可通过构造函数来指定。
  • 在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用以后释放许可。
  • 如果没有许可,那么 acquire() 方法将阻塞直到有许可(或者直到被中断、或者操作超时)。
  • 计算信号量的一种简化形式是二元信号量,即初始值为1的Semaphore。
  • 二元信号量可以用做互斥体(mutex),并具备不可重入的加锁语义。

使用信号量来约束容器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Semaphore;

/**
* 使用信号量来约束容器
*/
public class BoundedHashSet <T> {
private final Set<T> set;

/** 信号量 */
private final Semaphore semaphore;

public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<>());
semaphore = new Semaphore(bound);
}

public boolean add(T obj) throws InterruptedException {
// 从这个信号量获取一个许可
semaphore.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(obj);
return wasAdded;
} finally {
if (!wasAdded) {
// 释放许可
semaphore.release();
}
}
}

public boolean remove(Object obj) {
boolean wasRemoved = set.remove(obj);
if (wasRemoved) {
// 释放许可
semaphore.release();
}
return wasRemoved;
}
}

5.5 关卡(Barrier)

关卡类似于闭锁,它们都能够阻塞一组线程,直到某些事件发生。

关卡与闭锁关键的不同在于,所有线程必须同时到达关卡点,才能继续处理。闭锁等待的是事件;关卡等待的是其他线程

CyclicBarrier

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(关卡点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。

  • 当线程到达关卡点时,调用await()方法,线程会被阻塞,直到所有的线程都到达关卡点。
  • 如果所有的线程都到达了关卡点,关卡就会被突破,这样所有的线程都被释放,关卡会重置以备下一次使用。
  • 如果对await()的方法调用超时,或者阻塞中的线程被中断,那么关卡就被认为是失败的,所有调用await()未完成的线程,都通过BrokenBarrierException终止。

CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await()方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

在一个细胞的自动系统中用 CyclicBarrier 协调计算:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
* 在一个细胞的自动系统中用 CyclicBarrier 协调计算
*/
public class CellularAutomata {
/** 细胞 */
private final Board mainBoard;
/** 关卡 */
private final CyclicBarrier barrier;
/** 工作蛋白 */
private final Worker[] workers;

public CellularAutomata(Board board) {
// 细胞
this.mainBoard = board;
// Java 虚拟机可用的处理器数量
int count = Runtime.getRuntime().availableProcessors();
// 关卡
this.barrier = new CyclicBarrier(count, new Runnable() {
@Override
public void run() {
// 细胞提交新 value
mainBoard.commitNewValues();
}
});
// 工作蛋白
this.workers = new Worker[count];
for (int i = 0; i < count; i++) {
// 设置子细胞
workers[i] = new Worker(mainBoard.getSubBoard(count, i));
}
}

private class Worker implements Runnable {
/** 细胞 */
private final Board board;

public Worker(Board board) {
this.board = board;
}

@Override
public void run() {
// 细胞还没转换好
while (!board.hasConverged()) {
for (int x = 0; x < board.getMaxX(); x++) {
for (int y = 0; y < board.getMaxY(); y++) {
// 设置新的 x,y 值
board.setNewValue(x, y, computeValue(x, y));
}
}
try {
// 关卡(细胞所有的位置都转换好了,才可继续)---- 所有线程都运行到这里才可以继续
barrier.await();
} catch (InterruptedException | BrokenBarrierException ex) {
return;
}
}
}

private int computeValue(int x, int y) {
// 在 (x,y) 里计算新 value
return 0;
}
}

public void start() {
// 启动线程
for (Worker worker : workers) {
new Thread(worker).start();
}
// 等待细胞转换
mainBoard.waitForConvergence();
}

/**
* 细胞
*/
interface Board {
/**
* x 坐标
* @return x
*/
int getMaxX();

/**
* y 坐标
* @return y
*/
int getMaxY();

/**
* x,y 对应的 value
* @param x x
* @param y y
* @return value
*/
int getValue(int x, int y);

/**
* x,y 设置新的 value
* @param x x
* @param y y
* @param value value
* @return 新 value
*/
int setNewValue(int x, int y, int value);

/** 提交新 value */
void commitNewValues();

/**
* 是否转换好
* @return true/false
*/
boolean hasConverged();

/**
* 等待转换
*/
void waitForConvergence();

/**
* 获取子细胞
* @param numPartitions 分区
* @param index 所处分区
* @return 子细胞
*/
Board getSubBoard(int numPartitions, int index);
}
}

6. 建立高效、可伸缩的高速缓存

6.1 为计算结果建立高效、可伸缩的高速缓存

几乎所有的服务器应用都会使用某种形式的缓存。

重用之前的计算结果能降低延迟,提高吞吐量,但却要消耗更多内存。

看上去简单的缓存,可能会将性能瓶颈转变成伸缩性瓶颈,即使缓存是用来提高单线程性能的。

6.2 使用 HashMap 和同步 来初始化缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import net.jcip.annotations.GuardedBy;

import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;

/**
* 尝试使用 HashMap 和同步来初始化缓存
*/
public class Memoizer1<A, V> implements Computable<A, V> {
@GuardedBy("this")
private final Map<A, V> cache = new HashMap<>();
private final Computable<A, V> computer;

public Memoizer1(Computable<A, V> computer) {
this.computer = computer;
}

@Override
public synchronized V compute(A arg) throws InterruptedException {
// 从缓存中获取,查看缓存中是否存在该值
V result = cache.get(arg);
if (result == null) {
// 缓存中不存在该值,计算并返回
result = computer.compute(arg);
// 将就算返回的值,存入 map
cache.put(arg, result);
}
return result;
}
}


interface Computable <A, V> {
V compute(A arg) throws InterruptedException;
}

class ExpensiveFunction implements Computable<String, BigInteger> {
@Override
public BigInteger compute(String arg) {
return new BigInteger(arg);
}
}

Memorizer1Computable 实现类的计算结果缓存在Map<A, V> cache

因为HashMap不是线程安全的,为了保证并发性,Memorizer1用了个很保守的方法,对整个compute方法进行同步。这导致了Memorizer1会有很明显的可伸缩性问题。

当有很多线程调用compute方法,就会出现类似下图的情况(并发性能弱):

1

6.3 用 ConcurrentHashMap 来初始化缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* 用 ConcurrentHashMap 来初始化缓存
*/
public class Memoizer2<A, V> implements Computable<A, V> {
private final Map<A, V> cache = new ConcurrentHashMap<>();
private final Computable<A, V> computer;

public Memoizer2(Computable<A, V> computer) {
this.computer = computer;
}

@Override
public V compute(A arg) throws InterruptedException {
// 将就算返回的值,存入 map
V result = cache.get(arg);
if (result == null) {
// 缓存中不存在该值,计算并返回
result = computer.compute(arg);
// 将就算返回的值,存入 map
cache.put(arg, result);
}
return result;
}
}
  • ConcurrentHashMap为了提高本身的并发能力,在内部采用了一个叫做Segment的结构,一个Segment其实就是一个类HashTable的结构,Segment内部维护了一个链表数组。该结构的带来的副作用是Hash的过程要比普通的HashMap要长,但是带来的好处是写操作的时候可以只对元素所在的Segment进行加锁即可,不会影响到其他的Segment。ConcurrentHashMap让锁的粒度更精细一些,并发性能更好。

  • 当两个线程同时计算同一个值,它们并不知道有其它线程在做同一的事,存在着资源被浪费的可能。如下图:

    2

6.4 使用 ConcurrentHashMap+FutureTask 来初始化缓存

  • 闭锁是可以延迟线程的进度直到线程到达终止状态。Futurerask 可以作为闭锁。
  • 闭锁工作起来就像大门:直到闭锁达到终点状态之前,门一直是关闭的,没有线程能够通过,在终点状态到来的吋候,门开了,允许所有线程都通过。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

/**
* 用 FutureTask 记录包装器来初始化缓存
*/
public class Memoizer3<A, V> implements Computable<A, V> {
private final Map<A, Future<V>> cache = new ConcurrentHashMap<>();
private final Computable<A, V> computer;

public Memoizer3(Computable<A, V> computer) {
this.computer = computer;
}

@Override
public V compute(final A arg) throws InterruptedException {
// 从缓存中获取,查看缓存中是否存在该值
Future<V> cacheFuture = cache.get(arg);
if (cacheFuture == null) {
Callable<V> eval = new Callable<V>() {
@Override
public V call() throws InterruptedException {
// 缓存中不存在该值,计算并返回
return computer.compute(arg);
}
};
// 将Future<V>,存入 map
FutureTask<V> task = new FutureTask<>(eval);
cacheFuture = task;
cache.put(arg, task);
// 执行 FutureTask,计算结果
task.run();
}
try {
// 获取结果并返回
return cacheFuture.get();
} catch (ExecutionException e) {
throw LaunderThrowable.launderThrowable(e.getCause());
}
}
}
  • Memorizer3 缓存的不是计算的结果,而是进行运算的 Future<V> cacheFuture

  • 因此首先检查有没有执行该任务的FutureTask

  • 如果执行该任务的FutureTask,则直接获得FutureTask

    1
    cacheFuture != null
    • 假如计算已经完成,FutureTask.get()方法可以立刻获得结果

      1
      return cacheFuture.get();
    • 假如计算未完成,后进入的线程阻塞直到get()返回结果;

  • 如果没有执行该任务的FutureTask,则创建一个FutureTask进行运算,后续进了的同样的运算可以直接拿到结果或者等待运算完成获得结果。

    1
    2
    if (cacheFuture == null) { ... }
    return cacheFuture.get();
  • 此程序仍然存在一个问题,

    1. 当A线程判断没有对应的cache.get(arg);缓存为空(即cache.get(arg)==null

    2. A线程创建FutureTask,还没进行cache.put(arg, task);操作

      1
      FutureTask<V> task = new FutureTask<>(eval);
    3. 这个时候,B线程判断缓存也是为空的(即cache.get(arg)==null),因为A线程还没进行cache.put(arg, task);操作

    4. B线程创建的FutureTask,就会会把A创建的FutureTask覆盖掉。

    5. 如下图:

      3

6.5 ConcurrentHashMap + FutureTask + Map原子操作 来初始化缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

/**
* ConcurrentHashMap + FutureTask + Map原子操作 来初始化缓存
*/
public class Memoizer<A, V> implements Computable<A, V> {
private final ConcurrentMap<A, Future<V>> cache = new ConcurrentHashMap<>();
private final Computable<A, V> computer;

public Memoizer(Computable<A, V> computer) {
this.computer = computer;
}

@Override
public V compute(final A arg) throws InterruptedException {
while (true) {
// 从缓存中获取,查看缓存中是否存在该值
Future<V> cacheFuture = cache.get(arg);
if (cacheFuture == null) {
Callable<V> eval = new Callable<V>() {
@Override
public V call() throws InterruptedException {
// 缓存中不存在该值,计算并返回
return computer.compute(arg);
}
};
// 将就算返回的值,存入 map(如果存在,就将返回值存入map)
FutureTask<V> task = new FutureTask<>(eval);
cacheFuture = cache.putIfAbsent(arg, task);
// 查看是否存在该 FutureTask,如果存在,那么计算结果
if (cacheFuture == null) {
cacheFuture = task;
task.run();
}
}
try {
// 获取结果并返回
return cacheFuture.get();
} catch (CancellationException e) {
cache.remove(arg, cacheFuture);
} catch (ExecutionException e) {
throw LaunderThrowable.launderThrowable(e.getCause());
}
}
}
}

程序做了两点改进:

  1. FutureTask<V> 存入 Map 时,使用 ConcurrentHashMap.putIfAbsent() 方法,使得原本复合的插入操作,原子化

  2. 使用了 while (true) { ... } ,当在cacheFuture.get()阻塞线程(即Map中,存在FutureTask<V>,等待结果,所以get()方法阻塞),

    并抛出CancellationException异常,则会再一次申请一个创建FutureTask的机会。

Java并发编程实战:第5章 构建块

https://osys.github.io/posts/ff33.html

作者

Osys

发布于

2022年08月29日

更新于

2022年08月29日

许可协议

评论