Java并发编程实战:第12章 测试并发程序

1. 测试正确性

1.1 测试并发程序

并发程序打造可以分为两类:

  1. 安全性测试
  2. 活跃性测试
    • 进展性测试
    • 无进展测试

与活跃性测试相关的事性能测试。性能的衡量:

  • 吞吐量
  • 响应性
  • 可伸缩性

1.2 测试正确性

在为某个并发类设计单元测试时,首先需要执行与测试串行类时相同的分析 —- 找出需要检查的不变性条件和后验条件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;

import java.util.concurrent.Semaphore;

@ThreadSafe
public class SemaphoreBoundedBuffer <E> {
/**
* Semaphore ---- 计数信号量
* availableItems ---- 可用items
* availableSpaces ---- 可用空间
*/
private final Semaphore availableItems, availableSpaces;

/** 存放 Element 的数组 */
@GuardedBy("this")
private final E[] items;

/**
* putPosition ---- 存入数组 items 的位置
* takePosition ---- 从数组中拿 Element 的位置
*/
@GuardedBy("this")
private int putPosition = 0, takePosition = 0;

public SemaphoreBoundedBuffer(int capacity) {
if (capacity <= 0) {
throw new IllegalArgumentException();
}
// 初始化幸好量,可用 items = 0,可用空间 spaces = capacity
availableItems = new Semaphore(0);
availableSpaces = new Semaphore(capacity);
items = (E[]) new Object[capacity];
}

public boolean isEmpty() {
return availableItems.availablePermits() == 0;
}

public boolean isFull() {
return availableSpaces.availablePermits() == 0;
}

/**
* 添加一个元素
* 可用空间 - 1
* 可用元素 + 1
*/
public void put(E x) throws InterruptedException {
availableSpaces.acquire();
doInsert(x);
availableItems.release();
}

/**
* 拿出一个元素
* 可用空间 + 1
* 可用元素 - 1
*/
public E take() throws InterruptedException {
availableItems.acquire();
E item = doExtract();
availableSpaces.release();
return item;
}

/**
* 添加一个元素
* @param x 元素
*/
private synchronized void doInsert(E x) {
int i = putPosition;
items[i] = x;

++i;
if (i == items.length) {
putPosition = 0;
} else {
putPosition = i;
}
}

private synchronized E doExtract() {
int i = takePosition;
E x = items[i];
items[i] = null;

++i;
if (i == items.length) {
takePosition = 0;
} else {
takePosition = i;
}
return x;
}
}

在计数信号量中,许可不会被显式地表现出来,也不会和它所在的线程有任何关联;release 创建许可,acquire消費许可。

1.2.1 基本的单元测试

对 SemaphoreBoundedBuffer 进行基本单元测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class TestBoundedBuffer extends TestCase {

@Test
public void testIsEmptyWhenConstructed() {
SemaphoreBoundedBuffer<Integer> bb = new SemaphoreBoundedBuffer<Integer>(10);
assertTrue(bb.isEmpty()); // bb.isEmpty() = true
assertFalse(bb.isFull()); // bb.isFull() = false
}

@Test
public void testIsFullAfterPuts() throws InterruptedException {
SemaphoreBoundedBuffer<Integer> bb = new SemaphoreBoundedBuffer<Integer>(10);
for (int i = 0; i < 10; i++) {
bb.put(i);
}
assertTrue(bb.isFull()); // bb.isFull() = true
assertFalse(bb.isEmpty()); // bb.isEmpty() = false
}

}

1.2.2 测试阻塞操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import junit.framework.TestCase;
import org.junit.Test;

public class TestBoundedBuffer extends TestCase {
private static final long LOCKUP_DETECT_TIMEOUT = 1000;

@Test
public void testTakeBlocksWhenEmpty() {
final SemaphoreBoundedBuffer<Integer> bb = new SemaphoreBoundedBuffer<Integer>(10);
Thread taker = new Thread() {
public void run() {
try {
int unused = bb.take();
fail();
} catch (InterruptedException success) { }
}
};
try {
taker.start();
Thread.sleep(LOCKUP_DETECT_TIMEOUT);
taker.interrupt();
taker.join(LOCKUP_DETECT_TIMEOUT);
assertFalse(taker.isAlive());
} catch (Exception unexpected) {
fail();
}
}

}

如果在某个测试用例创建的辅助线程中发现了一个错误,那么框架通常无法得知与这个线程相关的是哪一个测试,所以需要通过一些工作将成功或失败信息传递回主线程,从而才能将相应的信息报告出来。

1.2.3 测试安全性

  • 要想测试一个并发类在不可预知的并发访问下是否能够正确执行,我们可以安排多个线程对该并发类进行操作,运行一段时间再检查数据是否正常。
  • 在对并发类进行安全性测试时,要关注某些熟悉可能会引发数据异常、导致程序出现问题。简单地识别出这些受检查的属性来,不要在检查这些属性,人为的限制程序的并发性。最好能做到在检查测试的属性时,不需要添加任何同步代码。

SemaphoreBoundedBuffer 的生产者-消费者测试程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import junit.framework.TestCase;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class PutTakeTest extends TestCase {
protected static final ExecutorService pool = Executors.newCachedThreadPool();
protected CyclicBarrier barrier;
protected final SemaphoreBoundedBuffer<Integer> bb;
protected final int nTrials, nPairs;
protected final AtomicInteger putSum = new AtomicInteger(0);
protected final AtomicInteger takeSum = new AtomicInteger(0);

public static void main(String[] args) throws Exception {
new PutTakeTest(10, 10, 100000).test(); // sample parameters
pool.shutdown();
}

public PutTakeTest(int capacity, int npairs, int ntrials) {
this.bb = new SemaphoreBoundedBuffer<Integer>(capacity);
this.nTrials = ntrials;
this.nPairs = npairs;
this.barrier = new CyclicBarrier(npairs * 2 + 1);
}

void test() {
try {
for (int i = 0; i < nPairs; i++) {
pool.execute(new Producer());
pool.execute(new Consumer());
}
barrier.await(); // wait for all threads to be ready
barrier.await(); // wait for all threads to finish
assertEquals(putSum.get(), takeSum.get());
} catch (Exception e) {
throw new RuntimeException(e);
}
}

static int xorShift(int y) {
y ^= (y << 6);
y ^= (y >>> 21);
y ^= (y << 7);
return y;
}

/** 生产者 */
class Producer implements Runnable {
public void run() {
try {
int seed = (this.hashCode() ^ (int) System.nanoTime());
int sum = 0;
barrier.await();
for (int i = nTrials; i > 0; --i) {
bb.put(seed);
sum += seed;
seed = xorShift(seed);
}
putSum.getAndAdd(sum);
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

/** 消费者 */
class Consumer implements Runnable {
public void run() {
try {
barrier.await();
int sum = 0;
for (int i = nTrials; i > 0; --i) {
sum += bb.take();
}
takeSum.getAndAdd(sum);
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}

1.2.4 测试资源管理

  • 任何持有或管理着其他对象的对象,都应该在不需要某个对象时,放弃该对象的引用。
  • 存储资源泄漏会抑制垃圾回收器回收内存(以及线程、文件句柄、套接字、数据库连接或者其它有限资源),还会导致资源耗尽和应用程序的失败。因此要限制缓存的大小。
  • 对内存不合理的占有,可以简单地通过堆检查工具测试出来。例如 JProfiler 工具。

测试资源泄漏

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import junit.framework.TestCase;
import org.junit.Test;

public class TestBoundedBuffer extends TestCase {
private static final int CAPACITY = 10000;
private static final int THRESHOLD = 10000;

public class Big {
double[] data = new double[100000];
}

@Test
public void testLeak() throws InterruptedException {
SemaphoreBoundedBuffer<Big> bb = new SemaphoreBoundedBuffer<>(CAPACITY);
int heapSize1 = snapshotHeap(); // heap 的快照
for (int i = 0; i < CAPACITY; i++) {
bb.put(new Big());
}
for (int i = 0; i < CAPACITY; i++) {
bb.take();
}
int heapSize2 = snapshotHeap(); // heap 的快照
assertTrue(Math.abs(heapSize1 - heapSize2) < THRESHOLD);
}

private int snapshotHeap() {
/* Snapshot heap and return heap size */
return 0;
}

}

1.2.5 使用回调

  • 回调用户提供的代码,有助于创建测试用例;
  • 回调常常发生在一个对象生命周期的己知点上,这些点提供了很好的机会,来断言不变约束。例如,ThreadPoolExecutor 就把调用转到了任务的 Runnable 和 ThreadFactory 上。
  • 测试一个线程池,涉及到对其执行策路的大量要素的测试:当需要时就创建额外的线程,不需要时就不要创建;当需要时就回收空闲线程,等等。创建一个爱盖了所有可能性的全面的测试套件是一件非常好的事情,但是大多数可能性都可以简单地、独立地进行测试。

用于测试 ThreadPoolExecutor 的线程工厂

1
2
3
4
5
6
7
8
9
10
11
12
13
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

class TestingThreadFactory implements ThreadFactory {
public final AtomicInteger numCreated = new AtomicInteger();
private final ThreadFactory factory = Executors.defaultThreadFactory();

public Thread newThread(Runnable r) {
numCreated.incrementAndGet();
return factory.newThread(r);
}
}

验证线程池扩展的测试方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import junit.framework.TestCase;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class TestThreadPool extends TestCase {

/** 线程工厂 */
private final TestingThreadFactory threadFactory = new TestingThreadFactory();

public void testPoolExpansion() throws InterruptedException {
int MAX_SIZE = 10;
ExecutorService exec = Executors.newFixedThreadPool(MAX_SIZE, threadFactory);

// 提交 100 个 task 到 execute
for (int i = 0; i < 10 * MAX_SIZE; i++) {
exec.execute(new Runnable() {
public void run() {
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
}
for (int i = 0;
i < 20 && threadFactory.numCreated.get() < MAX_SIZE;
i++) {
Thread.sleep(100);
}
// threadFactory.numCreated.get() = 10
// MAX_SIZE = 10
// junit.framework.AssertionFailedError:
assertEquals(threadFactory.numCreated.get(), MAX_SIZE);
exec.shutdownNow();
}
}

如果核心池大小小于最大值,线程池会在执行的任务增多时相应地增长。

向池提交几个耗时任务,会使池中的执行任务的数量在足够长的时间内都是常量,这就可以进行一些断言,比如测试池是否如期地扩展。

1.2.6 产生更多的交替操作

由于并发代码中发生的错误一般都是低概率事件,所以在测试并发错误时需要反复地执行许多次。

有些方法可以提高发现这些错误的概率,在多处理器系统上,如果处理器的数量少于活动线程的数量,那么与单处理器的系统或者包含多个处理器的系统相比,将能产生更多的交替行为。

同样,如果在不同的处理器数量、操作系统以及处理器架构的系统上进行测试,就可以发现那些在特定运行环境中才会出现的问题。

有一种有用的方法能提高交替操作的数量。以便能更有效的搜索程序的状态空间:在访问共享状态的操作中,使用Thread.yield将产生更多的上下文切换。当代码在访问状态的时候没有使用足够的同步,将存在一些对执行时序敏感的错误,通过在某个操作的执行过程 中调用yield方法,可以将这些错误暴露出来。这种方法需要在测试中添加一些调用并且在正式产品中删除这些调用。

使用 Thread.yield 产生更多的交替操作

1
2
3
4
5
6
7
public synchronized void tranferCredits(Account from,Account to,int amount) {  
from.setBalance(from.getBalance()-amount);
if (random.nextInt(1000)>THRESHOLD) {
Thread.yield();
}
to.setBalance(to.getBalance()+amount);
}

2. 测试性能

2.1 性能测试

  • 性能测试要符合当前程序的应用场景,理想情况下应该反映出被测试对象在应用程序中的实际用法。
  • 性能测试将衡量典型测试用例中的端到端功能。通常,要获得一组合理的使用场景并不容易,理想情况下,在测试中应该反映出被测试对象在应用程序中的实际用法。
  • 根据经验值来调整各种不同的限值,例如线程数量,缓存容量等等,这些限值都依赖于平台特性(如CPU、内存)。

2.2 比较多种算法

  • 测试结果表明,LinkedBlockgingQueue 的可伸缩性要高于 ArrayBlockingQueue。
  • 从测试结果来看,这个结果似乎有些奇怪,链表队列在每次插入元素时,都必须分配一个链表节点对象,这似乎比基于数组的队列相比,链表队列的 put 和 take 等方法支持并发性更高的访问,因为一些优化后的链接队列算法能将队列头节点的更新操作与尾节点的更新操作分享开来。
  • 因此如果算法能通过多执行一些内存分配操作来降低竞争程度,那么这种算法通常具有更高的可伸缩性。

2.3 测量响应性

  • 同一进程可以同时在多个内核上执行(多线程),并且可以在进程之间共享内存。因此无论上下文切换发生什么,缓存同步都是不可避免的。
  • 如果缓存过小,那么将导致非常多的上下文切换次数。频繁的上下文切换,会导致程序吞吐量较为糟糕。
  • 如果线程由于较多的同步条件限制,导致持续的被阻塞,不公平的信号量(Semaphore)能够提供更好的吞吐量(差异性大),公平的信号量提供更低的差异性。

3. 避免性能测试陷阱

3.1 垃圾回收

  • 垃圾回收的时序是不可预知的,在一个测量数据的测试运行中,任何时候垃圾回收器都有可能运行。

  • 多次对程序进行测试时,如果仅在某一两次触发了垃圾回收,这可能会导致最终测试结果有所偏差。

  • 有两种策略可以防止垃圾回收操作对测试结果产生偏差。

    • 第一种策略是,确保垃圾回收操作在测试运行的整个期间都不会执行。

    • 第二种策略是,确保垃圾回收操作在测试期间执行多次,这样测试程序就能充分反映出运行期间的内存分配与垃圾回收等开销。

      (可以在调用JVM时指定-verbose:gc来判断是否执行了垃圾回收操作)

3.2 动态编译

  • HotSpot JVM 结合了字节码解释和动态编译。
  • 当一个类首次被加载后,JVM 会以解释字节码的方式执行。
  • 如果一个方法运行得较为频繁,动态编译器最终会将其挑出来,转成本机代码,当编译完成后,执行方式将由解释执行转换到直接执行。
  • 编译的时机时不可预知的。大多数程序在运行得足够长后,所有频繁执行的代码露肩都会被编译。
  • 在进行性能测试时,可以长时间运行要测试的程序,这样编译过程和解释执行仅仅占总体运行的很小一部分,对结果影响较小。或者先让待测试代码频繁执行,这样代码就会被完全编译,再测试时,即为直接执行。

在运行程序时,使用 -XX: +PrintCompilation,程序会在动态编译时打印出信息。可以通过这条消息来验证动态编译是在测试运行前,而不是在运行过程中。

3.3 不切实际的竞争程度

  • 并发的应用程序总是交替执行两种非常不同的工作:
    1. 访问共享数据,比如从共享工作队列中获取下一个任务。
    2. 线程本地的计算(执行任务,假设任务自身并不访问共享数据)。
  • 依赖于两种工作类型的相关特性,应用程序会经历不同级别的竞争,并表现出不同的性能与伸缩性行为。

如果有N个线程从共享工作队列中获取任务并执行,这些任务都是计算密集型的、耗时的(但并未频繁地跨线程访问数据),这种情况几乎没有竟争;吞吐量只受限于可用的 CPU 资源。另一方面,如果任务的生命周期很短,在工作队列上就会存在大量竞争,此时吞吐量受限于同步的开销。

为了获得有实际意义的结果,并发性能测试,除了需要考虑协调并发的因素,应该尽最去模拟让线程本地的计算由某一个特有的应用程序来完成。如果每个任务在真实应用程序中完成的工作,与测试程序相比,其本质和范围有相当大的不同,那么得到的关于性能瓶颈位置的结论将是毫无根据的。

Java并发编程实战:第12章 测试并发程序

https://osys.github.io/posts/2d61.html

作者

Osys

发布于

2022年08月29日

更新于

2022年08月29日

许可协议

评论