Java并发编程实战:第6章 任务执行

1. 在线程中执行任务

1.1 在线程中执行任务

  • 多数并发程序是围绕任务进行管理的,所谓任务就是抽象、离散的工作单元。
  • 正常情况下,服务器应该兼顾良好的吞吐量和快速的响应性
  • 在负荷状况下,应该平缓的劣化,不应该快速失败,为了达到这些策略,应该有一个明确的任务执行策略。

1.2 顺序地执行任务

顺序化地 Web Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

/**
* Created by osys on 2022/08/28 21:48.
*/
public class SingleThreadWebServer {
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
// 顺序执行,接受连接,处理连接
while (true) {
Socket connection = socket.accept();
handleRequest(connection);
}
}

private static void handleRequest(Socket connection) {
// 处理逻辑
}
}

顺序处理并发性能低,必须等待一个请求结束才能响应下一个请求。其他线程在等待某个请求处理结束时,CPU可能较为空闲,因此导致资源利用率非常低。

1.3 显式的为任务创建线程

Web Server为每个请求启动一个新的线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

/**
* Created by osys on 2022/08/28 21:48.
*/
public class ThreadPerTaskWebServer {
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
// 并发处理请求
while (true) {
final Socket connection = socket.accept();
Runnable task = new Runnable() {
@Override
public void run() {
handleRequest(connection);
}
};
new Thread(task).start();
}
}

private static void handleRequest(Socket connection) {
// 处理逻辑
}
}

主循环为每个连接都创建一个新线程处理请求,而不在主循环的内部处理请求

结论:

  1. 执行任务的负载已经脱离了主线程
  2. 并行处理任务,使得可以多个请求同时得到服务
  3. 任务处理代码必须是线程安全的,因为有多个任务会并发地调用它。

1.4 无限制创建线程的缺点

显式的为任务创建线程实例中,为每个任务都创建一个线程,存在一些实际的缺陷:

  • 线程生命周期的开销。创建和关闭线程都需要借助操作系统,花费大量时间。
  • 资源消耗高。如果可运行的线程数超过可用的处理器数,线程将会空闲。大量空闲线程会占用更多的内存。
  • 稳定性问题。应该限制可创建线程的数目。

通常来说,在一定范围增加创建线程,可以提高系统的吞吐量,一旦超出范围,创建更多线程可能占用过多资源,导致程序出现各种问题。

2. Executor 框架

2.1 Executor 框架

任务是逻辑上的工作单元,线程是使任务异步执行的机制。

1
2
3
4
5
6
7
8
9
10
package java.util.concurrent;

public interface Executor {

/**
* 在将来的某个时间执行给定的命令。该命令可以在新线程、池线程或调用线程中执行,具体取决于执行者实现。
* @param command 可运行的任务
*/
void execute(Runnable command);
}

Executor为任务提交和任务执行提供了解耦的标准方法。

2.2 使用 Executor 实现 Web Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* Created by osys on 2022/08/28 21:48.
*/
public class TaskExecutionWebServer {
private static final int THEAD_NUM = 100;
private static final Executor EXEC = new ThreadPoolExecutor(THEAD_NUM, THEAD_NUM, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100));

public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable task = new Runnable() {
@Override
public void run() {
handleRequest(connection);
}
};
EXEC.execute(task);
}
}

private static void handleRequest(Socket connection) {
// 处理逻辑
}
}

通过Executor我们将任务提交执行进行了解耦,代替了硬编码的创建线程。

2.3 执行策略

一个执行策略明确了需要在任务执行过程关注的点:

  • 任务在什么线程执行?
  • 任务以什么方式执行?
  • 可以有多少个任务并发执行?
  • 可以有多少任务进入等待队列?
  • 如果任务过载,需要放弃任务,怎么办?
  • 一个任务执行前后,应该做什么?

执行策略是对资源管理的工具,最佳策略取决于可用的计算资源和你对服务质量的要求。

2.4 线程池

  • 线程池管理工作者线程,帮助我们管理工作线程的创建和销毁,工作队列的处理,可用让我们更加关注任务的编写上。
  • 工作队列:其作用是持有所有等待执行的任务。

使用线程池好处:

  • 重用存在的线程,而不是创建新的线程,这可以在处理多请求时抵消线程创建、消亡产生的开销。
  • 在请求到达时,工作者线程通常已经存在,用于创建线程的等待时间并不会延迟任务的执行,因此提高了响应性。
  • 通过适当地调整线程池的大小,你可以得到足够多的线程以保持处理器忙碌,同时可以还防止过多的线程相互竞争资源,导致应用程序耗尽内存或者失败。

类库Executors提供了我们多种创建线程池的静态方法。

  1. 定长线程池 newCachedThreadPool()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    /**
    * 创建一个固定长度的线程池
    * 每次提交一个任务,就创建一个线程,直达达到线程池最大长度
    * 在需要创建线程时,使用提供的ThreadFactory创建新线程
    */
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>(),
    threadFactory);
    }

    使用的工作队列是new LinkedBlockingQueue()也就是工作队列是无限的,最好设置固定大小

  2. 可缓存线程池 newCachedThreadPool()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    /**
    * 创建一个可缓存的线程池
    * 如果当前线程池的长度超过了处理的需要时,它可以灵活地回收空闲的线程
    * 当需求增加时,它可以灵活地添加新的线程,而并不会对池的长度作任何限制
    */
    public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
    }
  3. 单线程化的 executor newSingleThreadExecutor()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    /**
    * 创建一个单线程化的 executor
    * 它只创建唯一的工作者线程来执行任务,如果这个线程异常结束,会有另一个取代它
    * executor 会保证任务,依照任务队列所规定的顺序执行。 --- (FIFO, LIFO,优先级)
    */
    public static ExecutorService newSingleThreadExecutor() {
    return new Executors.FinalizableDelegatedExecutorService(
    new ThreadPoolExecutor(1, 1, 0L,
    TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>()
    )
    );
    }
  4. 定时的线程池 newScheduledThreadPool()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    /**
    * 创建一个定长的线程池
    * 支持定时的,以及周期性的任务执行。
    */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    /**
    * 在需要创建线程时,使用提供的ThreadFactory创建新线程
    */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

2.5 Executor 的生命周期

Executor 本身并没有解决生命周期问题,它的子接口 ExecutorService 提供了一些接口用于解决这个问题:

1

  • shudown() 方法

    1
    2
    3
    4
    5
    6
    /**
    * 调用这个方法时,ExecutorService 停止接受任何新的任务
    * 且等待已经提交的任务执行完成,当所有已经提交的任务执行完毕后将会关闭ExecutorService
    * (已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的)
    */
    void shutdown();
  • shutdownNow() 方法

    1
    2
    3
    4
    5
    6
    /**
    * 调这个方法会强制关闭 ExecutorService,
    * 它将取消所有运行中的任务和在工作队列中等待的任务,
    * 这个方法返回一个 List 列表,列表中返回的是等待在工作队列中的任务
    */
    List<Runnable> shutdownNow();
  • isShutdown() 方法

    1
    2
    /** ExecutorService 关闭后返回true,否则返回false */
    boolean isShutdown();
  • isTerminated() 方法

    1
    2
    3
    4
    5
    /**
    * ExecutorService 关闭后所有任务都已完成,则返回true
    * 注意:除非先调用 shutdown() 或 shutdowNow(),否则 isTerminated 永远不会为 true
    */
    boolean isTerminated();
  • awaitTermination() 方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    /**
    * 在执行 shutdown() 后,阻塞 ExecutorService 关闭,
    * 直到所有任务都完成,或者直到 timeout,或者当前线程被中断。
    * 以先发生者为准。
    * @param timeout 超时时间
    * @param unit 超时参数的时间单位。
    * @return 如果 ExecutorService 关闭,return true
    * 如果 timeout,ExecutorService 还未关闭,return false
    * 当前线程被中断,抛出 InterruptedException 异常
    */
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
  • submit() 方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    /**
    * 提交一个任务到 Executor 工作队列中,执行该任务,并返回 Future<T> 结果。
    * 参为 Runnable 时,执行 Runnable 的 run()方法
    * 返回值 Future.get() 的结果为 null。
    * 参为 Runnable 和 result 时,执行 Runnable 的 run()方法
    * 返回值 Future.get() 的结果为传入的 result。
    * 参为 Callable 时,则执行 Callable 的 call() 方法,
    * 返回值 Future.get() 的结果为 call() 的返回值。
    * @param task 两种类型的 task:Callable<T>、Runnable
    * @param result 返回的结果
    * @return 执行后,返回的是 Future<T> 对象
    */
    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);
  • invokeAll() 方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    /**
    * 提交任务集到 ExecutorService,返回所有任务的执行结果。
    * 该方法会阻塞,必须等待所有的任务执行完成后统一返回。
    *
    * 如果全部任务在指定的时间内没有完成,则抛出异常。
    * @param tasks Collection<? extends Callable<T>>
    * @param timeout 超时时间
    * @param unit 时间单位
    * @return 执行后,返回的是 <T> List<Future<T>> 对象
    */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
  • invokeAny() 方法

    1
    2
    3
    4
    5
    /** 提交任务集到 ExecutorService,返回第一个执行完的任务的结果值 */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

    /** 提交任务集到 ExecutorService,在指定时间内,返回第一个执行完的任务的结果值,否则 Exception */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

支持关闭操作的 Web Server 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Created by osys on 2022/08/28 21:48.
*/
public class LifecycleWebServer {

private int THREAD_NUM = 10;
/** 创建 ExecutorService */
private final ExecutorService exec = new ThreadPoolExecutor(THREAD_NUM, THREAD_NUM, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(100));

public void start() throws IOException {
// 创建 socket 连接
ServerSocket socket = new ServerSocket(80);
// ExecutorService 未关闭
while (!exec.isShutdown()) {
try {
// 接受Socket连接
final Socket conn = socket.accept();
// 向 ExecutorService 提交任务
exec.execute(new Runnable() {
@Override
public void run() {
handleRequest(conn);
}
});
} catch (RejectedExecutionException e) {
if (!exec.isShutdown()) {
log("任务提交被拒绝", e);
}
}
}
}

public void stop() {
exec.shutdown();
}

private void log(String msg, Exception e) {
Logger.getAnonymousLogger().log(Level.WARNING, msg, e);
}

/** 处理逻辑 */
void handleRequest(Socket connection) {
Request req = readRequest(connection);
if (isShutdownRequest(req)) {
stop();
} else {
dispatchRequest(req);
}
}

interface Request {
}

private Request readRequest(Socket s) {
return null;
}

private void dispatchRequest(Request r) {
}

private boolean isShutdownRequest(Request r) {
return false;
}
}

该实例,利用 ExecutorService 提供的生命周期管理方法进行处理

2.6 延迟的、周期性的任务

在上面的 线程池 说明中,有描述到:定时的线程池 newScheduledThreadPool()

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 创建一个定长的线程池
* 支持定时的,以及周期性的任务执行。
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
/**
* 在需要创建线程时,使用提供的ThreadFactory创建新线程
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

ScheduledExecutorService 接口继承了 ExecutorService 接口。其接口方法有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* 创建并执行在给定延迟后启用的单次操作。
* command - 要执行的任务
* delay - 从现在开始延迟执行的时间
* unit - 延时参数的时间单位
*
* return 表示任务等待完成,并且其的 ScheduledFuture get()方法将返回 null
*
* Exception
* RejectedExecutionException - 如果任务无法安排执行
* NullPointerException - 如果命令为空
*/
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

/**
* 创建并执行在给定延迟后启用的ScheduledFuture。
*
* callable - 执行的功能
* delay - 从现在开始延迟执行的时间
* unit - 延迟参数的时间单位
*
* return 一个可用于提取结果或取消的ScheduledFuture
*
* Exception
* RejectedExecutionException - 如果该任务无法安排执行
* NullPointerException - 如果可调用为空
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

/**
* 创建并执行在给定的初始延迟之后,随后以给定的时间段首先启用的周期性动作;
* 那就是执行将在 initialDelay 之后开始,然后 initialDelay + period ,然后是initialDelay + (2 * period) ......。
* 如果任务的执行遇到异常,则后续的执行被抑制。 否则,任务将仅通过取消或终止执行人终止。
* 如果任务执行时间比其周期长,则后续执行可能会迟到,但不会同时执行。
*
* command - 要执行的任务
* initialDelay - 延迟第一次执行的时间
* period - 连续执行之间的时期
* unit - initialDelay和period参数的时间单位
*
* return 一个ScheduledFuture代表待完成的任务,其 get()方法将在取消时抛出异常
*
* Exception
* RejectedExecutionException - 如果该任务无法安排执行
* NullPointerException - 如果可调用为空
* IllegalArgumentException - 如果周期小于或等于零
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

/**
* 创建并执行在给定的初始延迟之后,首先启用的定期动作,随后在一个执行的终止和下一个执行的开始之间给定的延迟。
* 如果任务的执行遇到异常,则后续的执行被抑制。 否则,任务将仅通过取消或终止执行人终止。
*
* command - 要执行的任务
* initialDelay - 延迟第一次执行的时间
* period - 连续执行之间的时期
* unit - initialDelay 和 period 参数的时间单位
*
* return 一个 ScheduledFuture 代表待完成的任务,其 get() 方法将在取消时抛出异常
*
* Exception
* RejectedExecutionException - 如果该任务无法安排执行
* NullPointerException - 如果可调用为空
* IllegalArgumentException - 如果周期小于或等于零
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);

**对于延迟、周期任务可以考虑使用 ScheduledThreadPoolExecutor **

3. 寻找可强化的并行性

3.1 寻找可强化的并行性

  • Executor 框架让制定一个执行策略变得简单。不过想要使用 Executor,我们还必须能够将任务描述为 Runnable。
  • 在大多数服务端程序中,基本都存在这样一个明显的任务边界:单一的客户请求 为任务边界。
  • 但是,很多客户端程序中,任务边界有时并非如此显而易见。
  • 即使在服务端程序中,一个请求 任务,内部仍然会有可以进一步细化的并行性。

3.2 顺序执行的页面渲染器

  • 顺序处理

    1. 处理 HTML 文档最简单的方法是顺序处理。 当遇到一个文本标签,就将它渲染到图像缓存里;
    2. 当遇到一个图像的引用时,先通过网络获取它,然后也将它渲染到图像缓存里。
    3. 这样的顺序,可能会让用户等待很长时间,指导呈现出所有文本、图像
  • 预留占位符

    1. 先渲染文本元素,并为图像预留出矩形的占位符
    2. 处理文本后,程序返回到开始,并下载图像,将它们绘制到相应的占位符上。

3.3 示例:顺序的渲染页面元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.util.ArrayList;
import java.util.List;

/**
* Created by osys on 2022/08/28 21:48.
*/
public abstract class SingleThreadRenderer {
void renderPage(CharSequence source) {
// 文本
renderText(source);
// 图像
List<ImageData> imageData = new ArrayList<>();
for (ImageInfo imageInfo : scanForImageInfo(source)) {
imageData.add(imageInfo.downloadImage());
}
for (ImageData data : imageData) {
renderImage(data);
}
}

interface ImageData {
}

interface ImageInfo {
ImageData downloadImage();
}

abstract void renderText(CharSequence s);
abstract List<ImageInfo> scanForImageInfo(CharSequence s);
abstract void renderImage(ImageData i);
}

3.4 可携带结果的任务:CallableFuture

我们知道 Callable 接口有返回值,Runnable 接口没有返回值。

  • 我们可以将 RunnableCallable 提交给 Executor,然后得到一个 Future,用得到的 Future<T> 来获得任务执行的结果,或者取消任务。

    1
    public interface ExecutorService extends Executor {  }

    1

  • 也可以将 Runnable 实例化一个 FutureTask【如下,FutureTask 实现了 Runnable】

    1
    2
    3
    public class FutureTask<V> implements RunnableFuture<V> {  }

    public interface RunnableFuture<V> extends Runnable, Future<V> { }

    FutureTask 既可以提交给 Executor 来执行,又可以直接调用 run() 方法运行。

  • Runnable 或其分支,将其提交给 Executor 执行,Future.get() 的值可以指定,无指定默认为 null

    1
    2
    3
    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);
  • 将 Callable 提交给 Executor 执行,Future.get() 的值为 Callable 执行的返回值

3.5 示例:使用 Future 等待图像下载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
* Created by osys on 2022/08/28 21:48.
*/
public abstract class FutureRenderer {
/** 创建线程池 */
private final ExecutorService executor = Executors.newCachedThreadPool();

/** 页面资源渲染 */
void renderPage(CharSequence source) {
// 图片信息集合
final List<ImageInfo> imageInfos = scanForImageInfo(source);
// 创建一个 task,该 task 返回【图片数据】集合
Callable<List<ImageData>> task =
new Callable<List<ImageData>>() {
@Override
public List<ImageData> call() {
List<ImageData> result = new ArrayList<>();
// 从【图片信息集】中,下载所有的【图片数据】
for (ImageInfo imageInfo : imageInfos) {
result.add(imageInfo.downloadImage());
}
return result;
}
};
// 将 Callable 提交给 Executor
Future<List<ImageData>> future = executor.submit(task);

// 文本渲染
renderText(source);

try {
// 如果 Executor 已经将 task 执行完成,返回【图片数据】
// 那么将【图片数据】进行渲染
List<ImageData> imageData = future.get();
for (ImageData data : imageData) {
renderImage(data);
}
} catch (InterruptedException e) {
// 中断线程
Thread.currentThread().interrupt();
// 取消任务
future.cancel(true);
} catch (ExecutionException e) {
throw LaunderThrowable.launderThrowable(e.getCause());
}
}

/** 图片数据 */
interface ImageData {
}

/** 图片信息 */
interface ImageInfo {
/** 下载图片数据 */
ImageData downloadImage();
}

/** 文本渲染 */
abstract void renderText(CharSequence s);

/** 扫描图片 */
abstract List<ImageInfo> scanForImageInfo(CharSequence s);

/** 图片渲染 */
abstract void renderImage(ImageData i);
}


/** Throwable 强制转换为 RuntimeException */
class LaunderThrowable {

/**
* 将未经检查的 Throwable 抛出。
*
* 如果 throwable 是 RuntimeException 则返回 Throwable。
* 如果 throwable 是 Error 则抛出 Error。
* 否者抛出 IllegalStateException。
*/
public static RuntimeException launderThrowable(Throwable throwable) {
if (throwable instanceof RuntimeException) {
return (RuntimeException) throwable;
} else if (throwable instanceof Error) {
throw (Error) throwable;
} else {
throw new IllegalStateException("Not unchecked", throwable);
}
}
}

3.6 ExecutorBlockingQueue 功能的整合 —- CompletionService

CompletionService 整合了 ExecutorBlockingQueue 的功能。下面将从 CompletionService 实现上分析其功能。

CompletionService 将新异步任务的生产与已完成任务的结果消耗相分离

1
2
3
4
5
6
7
8
9
10
11
public interface CompletionService<V> {
Future<V> submit(Callable<V> task);

Future<V> submit(Runnable task, V result);

Future<V> take() throws InterruptedException;

Future<V> poll();

Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}

回顾一下 BlockingQueue

1
2
3
4
1. 阻寨队列(Blocking queue)提供了可阻塞的 put 和 take 方法
2. 如果 Queue 己经满了,put 方法会被阻塞,直到有空间可用
3. 如果 Queue是空的,那么 take 方法会被阻塞,直到有元素可用
4. Queue 的长度可以有限,也可以无限,无限的 Queue 永远不会充满,所以它的 put 方法永远不会阻塞

ExecutorCompletionservice 是实现 CompletionService 接口的一个类,并将 计算任务 委托给一个 Executor

1
2
3
4
5
6
7
public class ExecutorCompletionService<V> implements CompletionService<V> {
// 线程池
private final Executor executor;
// 阻塞队列
private final BlockingQueue<Future<V>> completionQueue;
// ... ...
}

ExecutorCompletionservice 的构造函数中创建了一个 BlockingQueue,用来保存完成的结果

1
2
3
4
5
6
7
8
9
public ExecutorCompletionService(Executor executor) {
// ...
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) {
// ...
this.completionQueue = completionQueue;
}

ExecutorCompletionservice 中有一个内部类:QueueingFuture

1
2
3
4
5
6
7
8
9
10
11
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
// 重写了 FutureTask 的 done() 方法
// completionQueue 为 ExecutorCompletionservice 的成员变量
// task 执行完成时会调用 FutureTask 中 done 方法
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}

当向 ExecutorCompletionservice 提交了一个任务后,首先把这个任务包装为一个 QueueingFuture

1
2
3
4
5
6
7
8
/** 其中一个 submit() 方法 */
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
// 执行 QueueingFuture
executor.execute(new QueueingFuture(f));
return f;
}

new QueueingFuture(f) —- 将 task 包装为一个 QueueingFuture。

提交一个 task

  • 如上,我们提交一个 task,该 task 会被包装为一个 QueueingFuture

  • QueueingFuture 是 FutureTask 的子类,且 QueueingFuture 重写了父类的 done() 方法。

    1
    2
    3
    4
    5
    6
    // completionQueue 为 ExecutorCompletionservice 的成员变量
    // private final BlockingQueue<Future<V>> completionQueue;

    protected void done() {
    completionQueue.add(task);
    }
  • task 执行完成时,都会调用 FutureTask 中 done 方法

  • 调用 done 方法,会将执行完 task 后的结果加入到阻塞队列中。

ExecutorCompletionservice 中的 take()poll()方法

1
2
3
4
5
6
7
8
9
10
11
12
/** 删除并获取 */
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
/** 删除并获取 */
public Future<V> poll() {
return completionQueue.poll();
}
/** 删除并获取 */
public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
return completionQueue.poll(timeout, unit);
}
  • 如上 take()poll()方法,实际上是将获取结果委托给了阻塞队列
  • 在阻塞队列中,如果队列满了,put 方法会被阻塞,直到有空间可用。
  • 如果队列是空的,那么 take 方法会被阻塞,直到有元素可用。

3.7 示例:使用 CompletionService 的页面渲染器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

/**
* Created by osys on 2022/08/28 21:48.
*/
public abstract class Renderer {
private final ExecutorService executor;

Renderer(ExecutorService executor) {
// 创建线程池
this.executor = executor;
}

void renderPage(CharSequence source) {
// 图片信息集合
final List<ImageInfo> info = scanForImageInfo(source);
// CompletionService 整合了 Executor 和 BlockingQueue 的功能,
// 创建一个 CompletionService 对象,将新异步任务的生产与已完成任务的结果消耗相分离
CompletionService<ImageData> completionService = new ExecutorCompletionService<>(executor);

for (final ImageInfo imageInfo : info) {
// 提交任务
completionService.submit(new Callable<ImageData>() {
@Override
public ImageData call() {
// 下载图片
return imageInfo.downloadImage();
}
});
}

// 文本渲染
renderText(source);

try {
// 图片渲染
for (int t = 0, n = info.size(); t < n; t++) {
// 在 CompletionService 中,task 会被其交给 Executor 进行执行
// 执行后的的 Future 会放在 BlockingQueue 中,通过 take()、poll() 方法获取
// 如果 Executor 已经将 task 执行完成,返回【图片数据】
// 那么将【图片数据】进行渲染
Future<ImageData> f = completionService.take();
ImageData imageData = f.get();
renderImage(imageData);
}
} catch (InterruptedException e) {
// 中断线程
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw LaunderThrowable.launderThrowable(e.getCause());
}
}

/** 图片数据 */
interface ImageData {
}

/** 图片信息 */
interface ImageInfo {
/** 下载图片数据 */
ImageData downloadImage();
}

/** 文本渲染 */
abstract void renderText(CharSequence s);

/** 扫描图片 */
abstract List<ImageInfo> scanForImageInfo(CharSequence s);

/** 图片渲染 */
abstract void renderImage(ImageData i);


static class LaunderThrowable {

/**
* 将未经检查的 Throwable 抛出。
*
* 如果 throwable 是 RuntimeException 则返回 Throwable。
* 如果 throwable 是 Error 则抛出 Error。
* 否者抛出 IllegalStateException。
*/
public static RuntimeException launderThrowable(Throwable throwable) {
if (throwable instanceof RuntimeException) {
return (RuntimeException) throwable;
} else if (throwable instanceof Error) {
throw (Error) throwable;
} else {
throw new IllegalStateException("Not unchecked", throwable);
}
}
}
}

3.8 为任务设置时限

  • 有时候如果一个活动无法在某个确定时间内完成,那么它的结果就失效了,此时程序可以放弃该活动。

  • Future<V> 有一个 get() 方法,可以在时限内获取结果,否者就抛出 TimeoutException 异常

    1
    2
    3
    4
    5
    6
    /**
    * 等待计算完成,然后检索其结果。如果超时,则抛出异常
    * timeout - 等待的最长时间
    * unit - 超时参数的时间单位
    */
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

3.9 示例:旅游预订门户网站

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;

import static java.util.concurrent.TimeUnit.NANOSECONDS;

/**
* Created by osys on 2022/08/28 21:48.
*/
public class RenderWithTimeBudget {
private static final Ad DEFAULT_AD = new Ad();
private static final long TIME_BUDGET = 1000;
private static final ExecutorService exec = Executors.newCachedThreadPool();

Page renderPageWithAd() throws InterruptedException {
long endNanos = System.nanoTime() + TIME_BUDGET;
// 提交任务
Future<Ad> adFuture = exec.submit(new FetchAdTask());
// 在等待广告时,进行渲染页面
Page page = renderPageBody();
Ad ad;
try {
long timeLeft = endNanos - System.nanoTime();
// 在规定时间内,获取广告。否则默认没有广告
ad = adFuture.get(timeLeft, NANOSECONDS);
} catch (ExecutionException e) {
ad = DEFAULT_AD;
} catch (TimeoutException e) {
// 没有在规定时间内获取到广告,取消任务
ad = DEFAULT_AD;
adFuture.cancel(true);
}
// 广告渲染
page.setAd(ad);
return page;
}

Page renderPageBody() { return new Page(); }


/** 广告 */
static class Ad {
}

/** 页面 */
static class Page {
public void setAd(Ad ad) { }
}

/** 添加广告的 task */
static class FetchAdTask implements Callable<Ad> {
@Override
public Ad call() {
return new Ad();
}
}

}

3.10 示例:在预订时间内请求旅游报价

ExecutorService.invokeAll() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 执行给定的任务,返回在所有完成结果。
* 通过 Future.isDone() 判断 task 是否已经完成。
* 在规定时间内,如果在任务列表中,还有任务未被执行,那么通过 Future.cancel() 取消任务。
*
* @param tasks - 收集任务
* @param timeout - 等待的最长时间
* @param unit - 超时参数的时间单位
* @return 完成结果。如果操作没有超时,每个任务都会完成。 如果超时,其中一些任务将不会完成。
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout,
TimeUnit unit)
throws InterruptedException;

Future 的 isDone() 方法和:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/** 如果任务已完成(正常终止、异常或取消),返回true */
boolean isDone();

/**
* 尝试取消执行此任务。
* 如果任务已完成、被取消、或由于某种无法取消,则此尝试将失败。
*
* 如果成功,并且当 cancel 时,此任务尚未启动,则此任务不应运行。
* 如果任务已经开始,则 mayInterruptIfRunning 参数确定是否中断执行该 task 的线程,以尝试停止该 task
*
* @param mayInterruptIfRunning true如果执行该任务的线程应该被中断; 否则,正在进行的任务被允许完成
* @return 如果任务无法取消,返回 false,否则返回 true。
*/
boolean cancel(boolean mayInterruptIfRunning);

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
* Created by osys on 2022/08/28 21:48.
*/
public class TimeBudget {

private static ExecutorService exec = Executors.newCachedThreadPool();

/**
* 获得旅游行情排名
* @param travelInfo 旅游资讯
* @param companies 公司 ---- set集合
* @param ranking 排行
* @param time 超时时间
* @param unit 时间单位
*/
public List<TravelQuote> getRankedTravelQuotes(TravelInfo travelInfo,
Set<TravelCompany> companies,
Comparator<TravelQuote> ranking,
long time,
TimeUnit unit) throws InterruptedException {
// 将【每个公司 + 旅游资讯】封装成 task
List<QuoteTask> tasks = new ArrayList<>();
for (TravelCompany company : companies) {
tasks.add(new QuoteTask(company, travelInfo));
}

// 将所有 task 提交给 ExecutorService
// 所有任务在规定时间内执行完任务
List<Future<TravelQuote>> quoteFutures = exec.invokeAll(tasks, time, unit);

// 报价获取
List<TravelQuote> quotes = new ArrayList<>(tasks.size());
Iterator<QuoteTask> taskIter = tasks.iterator();
for (Future<TravelQuote> quoteFuture : quoteFutures) {
QuoteTask task = taskIter.next();
try {
// 获取报价成功
quotes.add(quoteFuture.get());
} catch (ExecutionException e) {
// 获取报价失败
quotes.add(task.getFailureQuote(e.getCause()));
} catch (CancellationException e) {
// 获取报价超时
quotes.add(task.getTimeoutQuote(e));
}
}

Collections.sort(quotes, ranking);
return quotes;
}

}

class QuoteTask implements Callable<TravelQuote> {
private final TravelCompany company;
private final TravelInfo travelInfo;

public QuoteTask(TravelCompany company, TravelInfo travelInfo) {
this.company = company;
this.travelInfo = travelInfo;
}

/** 获取报价失败 */
TravelQuote getFailureQuote(Throwable t) {
return null;
}

/** 获取报价超时 */
TravelQuote getTimeoutQuote(CancellationException e) {
return null;
}

@Override
public TravelQuote call() throws Exception {
// 获取旅游报价
return company.solicitQuote(travelInfo);
}
}

/** 旅游公司 */
interface TravelCompany {
/** 获取旅游报价 */
TravelQuote solicitQuote(TravelInfo travelInfo) throws Exception;
}

/** 旅游报价 */
interface TravelQuote {
}

/** 旅游资讯 */
interface TravelInfo {
}

Java并发编程实战:第6章 任务执行

https://osys.github.io/posts/e39c.html

作者

Osys

发布于

2022年08月29日

更新于

2022年08月29日

许可协议

评论