Java并发编程实战:第8章 应用线程池

1. 应用线程池

1.1 在任务与执行策略之间的隐性耦合

Executor 框架可以将任务的提交任务的执行策略解耦开来。Executor 框架为制定和修改执行策略提供了相当大的灵活性,但并非所有的任务都能适用所有的执行策略。 有些类型的任 务需要明确地指定执行策略, 包括:

  1. 依赖性任务:简单来说就是,提交的 task 是需要依赖 其它任务 的, task 就类似有某种枷锁一样,浑身不自在。

    • 大多数任务都是独立的,它们不依赖于其他任务的执行时序、 执行结果或其他效果。
    • 当在线程池中执行独立的任务时, 可以随意地改变线程池的大小和配置,这些修改只会对执行性能产生影响。
    • 如果提交给线程池的任务需要依赖其他的任务, 那么就隐含地给执行策略带来了约束, 此时必须小心地维持这些执行策略,以避免产生活跃性问题。
  2. 采用线程封闭机制的任务:

    • 在单线程中的 Executor:能够对并发性做出更强的承诺。 它们能确保任务不会并发地执行, 使我们能够放宽代码对线程安全的要求。对象可以封闭在 task 所在的线程中, 使得在该线程中执行的 task 在访问该对象时不需要同步, 即使这些资源不是线程安全的也没有问题。
    • Executor 从单线程环境改为线程池环境,任务可能会被并发地执行,失去了线程安全性。
  3. 对响应时间敏感的任务:

    • GUI应用程序对于响应时间是敏感的:如果用户在点击按钮后需要很长延迟才能得到可见的反馈, 那么他们会感到不满。

    • 如果将一个运行时间较长的任务提交到单线程的Executor中, 或者将多个运行时间较长的任务提交到一个只包含少量线程的线程池 中, 那么将降低由该Executor管理的服务的响应性。

  4. 使用ThreadLocal的任务:

    • ThreadLocal使每个线程都可以拥有某个变量的一个私有版本。然而,只要条件允许,Executor可以自由地重用这些线程。
    • 在标准的Executor实现中,当执行需求较低时将回收空闲线程,而当需求增加时将添加新的线程,并且如果从任务中抛出了一个未检查异常,那么将用一个新的工作者线程来替代抛出异常的线程。
    • 只有当线程本地值的生命周期受限于任务的生命周期时,在线程池的线程中使用ThreadLocal才有意义。
    • 在线程池的线程中不应该使用 ThreadLocal在任务之间传递值。

只有当任务都是同类型的并且相互独立时,线程池的性能才能达到最佳。如果将运行时间较长的与运行时间较短的任务混合在一起,那么除非线程池很大,否则将可能造成拥塞。如果提交的任务依赖于其他任务,那么除非线程池无限大,否则将可能造成死锁

1.1.1 线程饥饿死锁

  • 在多线程中,如果某个线程任务,依赖于其它任务的执行,那么就有可能会产生线程死锁。

  • 在单线程的 Executor 中,如果一个任务将另一个任务提交到同一个 Executor,并且等待这个被提交任务的结果,那么通常会引发死锁。

    第二个任务停留在工作队列中,并等待第一个任务完成,而第一个任务又无法完成,因为它在等待第二个任务的完成。

在单线程化的 Executor 中死锁的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ThreadDeadlock {
// single executor
ExecutorService exec = Executors.newSingleThreadExecutor();

public class LoadFileTask implements Callable<String> {
private final String fileName;

public LoadFileTask(String fileName) {
this.fileName = fileName;
}

public String call() throws Exception {
// 读取文件
return "";
}
}

public class RenderPageTask implements Callable<String> {
public String call() throws Exception {
Future<String> header, footer;
header = exec.submit(new LoadFileTask("header.html"));
footer = exec.submit(new LoadFileTask("footer.html"));
String page = renderBody();

return header.get() + page + footer.get();
}

private String renderBody() {
// 页面渲染
return "";
}
}

public void test() {
RenderPageTask mainTask = new RenderPageTask();
exec.submit(mainTask);
}
}

例子 ThreadDeadlock.RenderPageTask 将两个 LoadFileTask 添加到单例的 ExecutorService 中。在 tast() 中,我们将 RenderPageTask 添加到单例的 ExecutorService 中,那么会出现 mainTask 等待其他两个 LoadFileTask 的结果,久久不能结束,而那两个 LoadFileTask 却需要等待 mainTask 结束才能被单例的 ExecutorService 执行(久久不能开始)。

1.1.2 耗时操作

  • 执行时间较长的任务不仅会造成线程池堵塞,甚至还会增加执行时间较短任务的服务时间。

  • 如果线程池中线程的数量远小于在稳定状态下执行时间较长任务的数量, 那么到最后可能所有的线程都会运行这些执行时间较长的任务, 从而影响整体的响应性。

  • 限定任务等待资源的时间, 而不要无限制地等待。

    例如 Thread.join、BlockingQueue.put、CountDownLatch.await 以及 Selector.select 等。

1.2 定制线程池的大小

在代码中通常不会固定线程池的大小。线程池的长度应该通过某种配置机制来提供,或者根据 Runtime.availableProcessors() 来动态计算。

1
2
3
4
public class Runtime {
// 返回 Java 虚拟机可用的处理器数量。 int > 0
public native int availableProcessors();
}
  • 要设置线程池的大小,需要避免【过大】和【过小】这两种极端情况。
  • 如果线程池过大,那么大量的线程将在相对很少的CPU和内存资源上发生竞争,这不仅会导致更高的内存使用量,而且还可能耗尽资源。
  • 如果线程池过小,那么将导致许多空闲的处理器无法执行工作,从而降低吞吐率。
  • 如果需要执行不同类别的任务,井且它们之间的行为相差很大,那么应该考虑使用多个线程池,从而使每个线程池可以根据各自的工作负载来调整。

大小设定公式

1
最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目

1.3 配置 ThreadPoolExecutor

ThreadPoolExecutor 为一些 Executor 提供了基本的实现,这些 Executor 是由 Executors 中 的 newCachedThreadPool、newFixedThreadPool 和 newScheduledThreadExecutor 等工厂方法返回的。

1.3.1 线程的创建与销毁

  • 核心池大小(core pool size)、最大池的大小(maximum pool size)和存活时间(keep-alive time)共同管理着线程的创建与销段。

  • 核心池大小是目标的大小;线程池的实现试图维护池的大小:即使没有任务执行,池的大小也等于核心池的大小,并且直到工作队列充满前,池都不会创建更多的线程。

  • 最大池的大小是可同时活动的线程数的上限。如果一个线程已经闲置的时间超过了存活时间,它将成为一个被回收的候选者,如果当前的池的大小超过了核心池的大小,线程池会终止它。

  • Executors.newFixedThreadPool()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public class Executors {
    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, // 核心池大小
    nThreads, // 最大池的大小
    0L, // 存活时间
    TimeUnit.MILLISECONDS,// 时间单位
    new LinkedBlockingQueue<Runnable>());// 执行任务之前保存任务的队列
    }

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, // 核心池大小
    nThreads, // 最大池的大小
    0L, // 存活时间
    TimeUnit.MILLISECONDS,// 时间单位
    new LinkedBlockingQueue<Runnable>(),// 执行任务之前保存任务的队列
    threadFactory);// Executor创建新线程时使用的工厂
    }
    }
  • ThreadPoolExecutor 其中一个构造方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    /**
    * 使用给定的初始参数创建一个新的ThreadPoolExecutor 。
    * 参数:
    * corePoolSize - 保留在池中的线程数,即使它们是空闲的,除非设置allowCoreThreadTimeOut
    * maximumPoolSize – 池中允许的最大线程数
    * keepAliveTime – 当线程数大于核心时,这是多余的空闲线程在终止前等待新任务的最长时间。
    * unit – keepAliveTime参数的时间单位
    * workQueue – 用于在执行任务之前保存任务的队列。此队列将仅保存由execute方法提交的Runnable任务。
    * threadFactory – 执行器创建新线程时使用的工厂
    * handler – 由于达到线程边界和队列容量而阻塞执行时使用的处理程序
    */
    public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler) {
    // ......
    }

1.3.2 管理队列任务

  • 有限线程池限制了可以并发执行的任务的数量。

  • 如果新增任务的频率超过了线程池能过处理它们的速度,任务将在队列中等候。

  • 即使通车平均任务新增都很稳定,也难免会出现突然的激增。尽管队列有助于缓和瞬时的任务激增,但是如果任务持续快速地到来,队列中很多任务等待被执行,这可能会耗尽内存。

  • ThreadPoolExecutor 允许提供一个 BlockingQueue 来保存等待执行的任务。 基本的任务排队方法有 3 种: 无界队列有界队列同步移交 (Synchronous Handoff)。

  • 对于庞大或者无限的池,可以使用 SynchronousQueue,完全绕开队列,将任务直接从生产者移交给工作者线程。

    • SynchronousQueue 并不是一个真正的队列,而是一种管理直接在线程间移交信息的机制。
    • 把一个元素放入到 SynchronousQueue 中,必须有另一个线程正在等待接受移交的任务。
    • 如果没有这样一个线程,只要当前池的大小还小于最大值,ThreadPoolBxecutor 就会创建一个新的线程;否则根据饱和策略,任务会被拒绝。
    • 只有当池是无限的,或者可以接受任务被拒绝,SynchronousQueue 才是一个有实际价值的选揮。
  • 对于先进先出的队列(如:LinkedBlockingQueue、ArrayBlockingQueue),都是顺序执行任务的。如果想要控制任务的执行顺序,可以使用优先队列(PriorityBlockingQueue),通过 Comparator 定义任务优先级。

1.3.3 饱和策略

当一个有限队列充满后,饱和策略开始起作用。ThreadPoolExecutor 的饱和策略可以通过 setRejectedExecutionHandler() 来修改。JDK提供的实现有:AbortPolicy、CallerRunsPolicy、DiscardPolicy 和 DiscardOldestPolicy。

  • 中止(Abort)策略:

    默认的饱和策略,该策略将抛出未检查的 RejectedExecutionException。

  • 抛弃最旧的(Discard-Oldest)策略

    该策略则会抛弃下一个将被执行的任务,然后尝试重新提交新的任务。

  • 调用者运行(Caller-Runs)策略:

    该策略实现了一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。它不会在线程池的某个线程中执行新提交的任务,而是在一个调用了execute的线程中执行该任务。

创建一个可变长的线程池,使用受限队列和 “调用者运行” 饱和策略

1
2
3
4
5
6
7
8
9
10
11
12
13
private static final Integer CORE_POOL_SIZE = 1000;
private static final Integer MAXIMUM_POOL_SIZE = 2000;
private static final Long KEEP_ALIVE_TIME = 0L;
private static final Integer CAPACITY = 1000;

ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE, // 保留在池中的线程数
MAXIMUM_POOL_SIZE, // 池中允许的最大线程数
KEEP_ALIVE_TIME, // 多余的空闲线程在终止前等待新任务的最长时间
TimeUnit.MILLISECONDS, // 时间单位
new LinkedBlockingDeque<Runnable>(CAPACITY), // 用于在执行任务之前保存任务的队列
new ThreadPoolExecutor.CallerRunsPolicy() // 调用者运行(Caller-Runs)策略
);

使用 Semaphore 来遏制任务的提交

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import net.jcip.annotations.ThreadSafe;

import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;

@ThreadSafe
public class BoundedExecutor {
private final Executor exec;
private final Semaphore semaphore;

public BoundedExecutor(Executor exec, int bound) {
this.exec = exec;
this.semaphore = new Semaphore(bound);
}

public void submitTask(final Runnable command) throws InterruptedException {
semaphore.acquire();
try {
exec.execute(new Runnable() {
public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
}
}
}

1.3.4 线程工厂

  • 每当线程池需要创建一个线程时,都是通过线程工厂方法来完成的。
  • 默认的线程工厂方法将创建一个新的、非守护的线程,并且没有特殊的配置。
  • 通过指定一个线程工厂方法,可以定制线程池的配置信息。
  • 在 ThreadFactory 中只定义了一个方法 newThread(),每当线程池需要创建一个新线程时都会调用这个方法。

定制的线程工厂

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

public class MyThreadFactory implements ThreadFactory {
private final String poolName;

public MyThreadFactory(String poolName) {
this.poolName = poolName;
}

public Thread newThread(Runnable runnable) {
return new MyAppThread(runnable, poolName);
}
}

自定义的线程基类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class MyAppThread extends Thread {
/** 线程默认名称 */
public static final String DEFAULT_NAME = "MyAppThread";
/** 是否调用 debug 调试生命周期 */
private static volatile boolean debugLifecycle = false;
/** 线程名称的一部分:这里使用递增数命名线程 */
private static final AtomicInteger created = new AtomicInteger();
/** 运行的线程数 */
private static final AtomicInteger alive = new AtomicInteger();
/** 日志 */
private static final Logger log = Logger.getAnonymousLogger();

public MyAppThread(Runnable r) {
this(r, DEFAULT_NAME);
}

public MyAppThread(Runnable runnable, String name) {
// 线程初始化
super(runnable, name + "-" + created.incrementAndGet());
// 未捕获的异常处理
setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
log.log(Level.SEVERE, "未捕获的异常线程:" + t.getName(), e);
}
});
}

public void run() {
// 复制 debug 调试断言,以确保始终一致的值。
boolean debug = debugLifecycle;
if (debug) log.log(Level.FINE, "Created " + getName());
try {
alive.incrementAndGet();
super.run();
} finally {
alive.decrementAndGet();
if (debug) log.log(Level.FINE, "Exiting " + getName());
}
}

public static int getThreadsCreated() {
return created.get();
}

public static int getThreadsAlive() {
return alive.get();
}

public static boolean getDebug() {
return debugLifecycle;
}

public static void setDebug(boolean b) {
debugLifecycle = b;
}
}

1.3.5 构造后再定制 ThreadPoolExecutor

  • 大多数通过构造函数传递给 ThreadPoolExecutor 的参数(比如核心池大小,最大池大小,存活时间,线程工厂和拒绝执行处理器(rejected execution handler)),都可以在创建 Executor 后通过 setters 进行修改。Executors.newSingleThreadExecutor() 创建的 Executor 除外。

    1
    2
    3
    4
    5
    6
    public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>()));
    }

    newSingleThreadExecutor() 与其他方法的实现不同,它按 FinalizableDelegatedExecutorService 方式封装的 ExecutorService,而不是原始的 ThreadPoolExecutor。

  • 在 Executors中包含一个 unconfigurableExecutorService() 工厂方法:

    1
    2
    3
    4
    5
    public static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
    if (executor == null)
    throw new NullPointerException();
    return new DelegatedExecutorService(executor);
    }

    它返回一个现有的 ExecutorService,并对它进行包装。它只暴露出 ExecutorService 的方法,不能进行进一步的配置。

1.4 扩展 ThreadPoolExecutor

ThreadPoolExecutor 是可扩展的, 它提供了几个 “钩子” 方法让子类去复写:

  • beforeExecute()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    /**
    * 在给定线程中,执行给定 Runnable 之前调用的方法。
    * 此方法由将执行 【Runnable r】 的线程 【Thread t】 调用,并可用于重新初始化 ThreadLocals,或执行日志记录
    * 这个实现什么都不做,但可以在子类中定制。
    *
    * 参数:
    * t - 将运行【Runnable r】的线程
    * r - 将要执行的任务
    */
    protected void beforeExecute(Thread t, Runnable r) { }
  • afteExecute()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    /**
    * 在完成给定 Runnable 的执行时调用的方法。
    * 如果 【Throwable != null】,则 Throwable 是导致执行突然终止的未捕获的 RuntimeException 或 Error
    * 这个实现什么都不做,但可以在子类中定制。
    *
    * 参数:
    * t - 导致终止的异常,如果执行正常完成,则返回 null
    * r - 已完成的任务
    */
    protected void afterExecute(Runnable r, Throwable t) { }
  • terminated()

    1
    2
    /** Executor 终止时调用的方法。这个实现什么都不做,但可以在子类中定制。 */
    protected void terminated() { }

扩展线程池,以提供日志和计时功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;

public class TimingThreadPool extends ThreadPoolExecutor {

public TimingThreadPool() {
super(1, 1, 0L, TimeUnit.SECONDS, null);
}

private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
private final Logger logger = Logger.getLogger(String.valueOf(TimingThreadPool.class));
private final AtomicLong numTasks = new AtomicLong();
private final AtomicLong totalTime = new AtomicLong();

protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
logger.fine(String.format("Thread %s: start %s", t, r));
startTime.set(System.nanoTime());
}

protected void afterExecute(Runnable r, Throwable t) {
try {
long endTime = System.nanoTime();
long taskTime = endTime - startTime.get();
numTasks.incrementAndGet();
totalTime.addAndGet(taskTime);
logger.fine(String.format("Thread %s: end %s, time=%dns",
t, r, taskTime));
} finally {
super.afterExecute(r, t);
}
}

protected void terminated() {
try {
logger.info(String.format("Terminated: avg time=%dns",
totalTime.get() / numTasks.get()));
} finally {
super.terminated();
}
}
}

2. 并行递归算法

2.1 并行递归算法

如果每一个循环的每次迭代都睡觉哦独立的,并且我们不必等待所有的迭代都完成后再一次处理,那么我们可以使用 Executor 把一个顺序的循环转化为并行的循环。

把顺序执行转换为并行执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public abstract class TransformingSequential {

/** 顺序执行 */
void processSequentially(List<Element> elements) {
for (Element e : elements) {
process(e);
}
}

/** 并行执行 */
void processInParallel(Executor exec, List<Element> elements) {
for (final Element e : elements) {
exec.execute(() -> process(e));
}
}

public abstract void process(Element e);

interface Element {
}
}

当每个选代彼此独立,并且完成循环体中每个送代的工作,意义都足够重大,足以弥补管理一个新任务的开销时,这个顺序循环是适合并行化的。

把顺序递归转化为并行递归

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public abstract class TransformingSequential {

/** 顺序递归 */
public <T> void sequentialRecursive(List<Node<T>> nodes,
Collection<T> results) {
for (Node<T> n : nodes) {
results.add(n.compute());
sequentialRecursive(n.getChildren(), results);
}
}

/** 并行递归 */
public <T> void parallelRecursive(final Executor exec,
List<Node<T>> nodes,
final Collection<T> results) {
for (final Node<T> n : nodes) {
exec.execute(() -> results.add(n.compute()));
parallelRecursive(exec, n.getChildren(), results);
}
}

/** 执行并行递归,获取结果 */
public <T> Collection<T> getParallelResults(List<Node<T>> nodes)
throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
parallelRecursive(exec, nodes, resultQueue);
exec.shutdown();
exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
return resultQueue;
}

interface Node <T> {
T compute();
List<Node<T>> getChildren();
}
}

2.2 示例:谜题框架

“搬箱子” 谜题的抽象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import java.util.Set;

/**
* “搬箱子” 谜题的抽象
* @param <P> 位置
* @param <M> 移动
*/
public interface Puzzle <P, M> {
/** 初始化 */
P initialPosition();

/** 移动位置是否为本位置 */
boolean isGoal(P position);

/** 合法移动 */
Set<M> legalMoves(P position);

/** 搬箱子 */
P move(P position, M move);
}

谜题解决者框架的链节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import net.jcip.annotations.Immutable;

import java.util.LinkedList;
import java.util.List;

@Immutable
public class PuzzleNode <P, M> {
/** 位置 */
final P pos;
/** 移动位置 */
final M move;
/** 上一个位置 */
final PuzzleNode<P, M> prev;

public PuzzleNode(P pos, M move, PuzzleNode<P, M> prev) {
this.pos = pos;
this.move = move;
this.prev = prev;
}

/** 链表转集合 */
List<M> asMoveList() {
List<M> solution = new LinkedList<M>();
for (PuzzleNode<P, M> node = this; node.move != null; node = node.prev) {
solution.add(0, node.move);
}
return solution;
}
}

顺序话版的谜题解决者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class SequentialPuzzleSolver <P, M> {
private final Puzzle<P, M> puzzle;
private final Set<P> seen = new HashSet<P>();

public SequentialPuzzleSolver(Puzzle<P, M> puzzle) {
this.puzzle = puzzle;
}

/** 解决者 */
public List<M> solve() {
P pos = puzzle.initialPosition();
return search(new PuzzleNode<P, M>(pos, null, null));
}

/** 箱子移动 */
private List<M> search(PuzzleNode<P, M> node) {
// 该位置不存在箱子,或者箱子没有到过这个位置
if (!seen.contains(node.pos)) {
seen.add(node.pos);
// 移动位置是否为本位置
if (puzzle.isGoal(node.pos)) {
return node.asMoveList();
}
// 获取每个合法移动
for (M move : puzzle.legalMoves(node.pos)) {
// 当前箱子位置移动
P pos = puzzle.move(node.pos, move);
// 下一个箱子移动
PuzzleNode<P, M> child = new PuzzleNode<>(pos, move, node);
List<M> result = this.search(child);
if (result != null) {
return result;
}
}
}
return null;
}
}

并发版的谜题解决者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class ConcurrentPuzzleSolver <P, M> {

/** 搬箱子谜题 */
private final Puzzle<P, M> puzzle;
private final ExecutorService exec;
/** 箱子是否存在该位置 */
private final ConcurrentMap<P, Boolean> seen;
/** 可携带结果的闭锁 */
protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<>();

public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) {
this.puzzle = puzzle;
// Return ThreadPoolExecutor Object
this.exec = initThreadPool();
this.seen = new ConcurrentHashMap<P, Boolean>();
if (exec instanceof ThreadPoolExecutor) {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) exec;
// 被拒绝任务,它默默地丢弃被拒绝的任务。
threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
}
}

private ExecutorService initThreadPool() {
return Executors.newCachedThreadPool();
}

/** 解决者 */
public List<M> solve() throws InterruptedException {
try {
P p = puzzle.initialPosition();
exec.execute(newTask(p, null, null));
// 阻塞,知道找到一个方案
PuzzleNode<P, M> solutionPuzzleNode = solution.getValue();
return (solutionPuzzleNode == null) ? null : solutionPuzzleNode.asMoveList();
} finally {
exec.shutdown();
}
}

protected Runnable newTask(P position, M move, PuzzleNode<P, M> puzzleNode) {
return new SolverTask(position, move, puzzleNode);
}

protected class SolverTask extends PuzzleNode<P, M> implements Runnable {
SolverTask(P pos, M move, PuzzleNode<P, M> prev) {
super(pos, move, prev);
}

public void run() {
// 已找到一个解决方案,或者该位置曾经到达过
if (solution.isSet() || seen.putIfAbsent(pos, true) != null) {
return;
}
// 移动位置是否为本位置
if (!puzzle.isGoal(pos)) {
// 获取每个合法移动
for (M move : puzzle.legalMoves(pos)) {
exec.execute(
// 创建一个新的 task 进行位置移动
newTask(puzzle.move(pos, move),
move,
this)
);
}
} else {
// 移动位置为本位置
solution.setValue(this);
}
}
}
}

ConcurrentPuzzleSolver 使用可携带结果的闭锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@ThreadSafe
public class ValueLatch <T> {
@GuardedBy("this")
private T value = null;
/** 一种同步辅助,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。 */
private final CountDownLatch done = new CountDownLatch(1);

public boolean isSet() {
return (done.getCount() == 0);
}

public synchronized void setValue(T newValue) {
if (!isSet()) {
value = newValue;
// Decrease ---> 【done.getCount() - 1】
done.countDown();
}
}

public T getValue() throws InterruptedException {
// 当前线程等待直到锁存器倒计时到零
done.await();
synchronized (this) {
return value;
}
}
}

Java并发编程实战:第8章 应用线程池

https://osys.github.io/posts/9a9d.html

作者

Osys

发布于

2022年08月29日

更新于

2022年08月29日

许可协议

评论