1. 在线程中执行任务 1.1 在线程中执行任务
多数并发程序是围绕任务进行管理的,所谓任务
就是抽象、离散的工作单元。
正常情况下,服务器应该兼顾良好的吞吐量和快速的响应性 。
在负荷状况下,应该平缓的劣化 ,不应该快速失败,为了达到这些策略,应该有一个明确的任务执行策略。
1.2 顺序地执行任务 顺序化地 Web Server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import java.io.IOException;import java.net.ServerSocket;import java.net.Socket;public class SingleThreadWebServer { public static void main (String[] args) throws IOException { ServerSocket socket = new ServerSocket (80 ); while (true ) { Socket connection = socket.accept(); handleRequest(connection); } } private static void handleRequest (Socket connection) { } }
顺序处理并发性能低
,必须等待一个请求结束才能响应下一个请求。其他线程在等待某个请求处理结束时,CPU可能较为空闲,因此导致资源利用率非常低。
1.3 显式的为任务创建线程 Web Server为每个请求启动一个新的线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import java.io.IOException;import java.net.ServerSocket;import java.net.Socket;public class ThreadPerTaskWebServer { public static void main (String[] args) throws IOException { ServerSocket socket = new ServerSocket (80 ); while (true ) { final Socket connection = socket.accept(); Runnable task = new Runnable () { @Override public void run () { handleRequest(connection); } }; new Thread (task).start(); } } private static void handleRequest (Socket connection) { } }
主循环为每个连接都创建一个新线程
以处理请求
,而不在主循环
的内部处理请求
。
结论:
执行任务的负载已经脱离了主线程
并行处理任务,使得可以多个请求同时得到服务
任务处理代码必须是线程安全的,因为有多个任务会并发地调用它。
1.4 无限制创建线程的缺点 显式的为任务创建线程实例中,为每个任务都创建一个线程,存在一些实际的缺陷:
线程生命周期的开销。 创建和关闭线程都需要借助操作系统,花费大量时间。
资源消耗高。 如果可运行的线程数超过可用的处理器数,线程将会空闲。大量空闲线程会占用更多的内存。
稳定性问题。 应该限制可创建线程的数目。
通常来说,在一定范围增加创建线程,可以提高系统的吞吐量,一旦超出范围,创建更多线程可能占用过多资源,导致程序出现各种问题。
2. Executor 框架 2.1 Executor 框架 任务是逻辑上的工作单元,线程是使任务异步执行的机制。
1 2 3 4 5 6 7 8 9 10 package java.util.concurrent;public interface Executor { void execute (Runnable command) ; }
Executor为任务提交和任务执行提供了解耦的标准方法。
2.2 使用 Executor 实现 Web Server 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import java.io.IOException;import java.net.ServerSocket;import java.net.Socket;import java.util.concurrent.Executor;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class TaskExecutionWebServer { private static final int THEAD_NUM = 100 ; private static final Executor EXEC = new ThreadPoolExecutor (THEAD_NUM, THEAD_NUM, 0 , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>(100 )); public static void main (String[] args) throws IOException { ServerSocket socket = new ServerSocket (80 ); while (true ) { final Socket connection = socket.accept(); Runnable task = new Runnable () { @Override public void run () { handleRequest(connection); } }; EXEC.execute(task); } } private static void handleRequest (Socket connection) { } }
通过Executor我们将任务提交
和执行
进行了解耦,代替了硬编码的创建线程。
2.3 执行策略 一个执行策略明确了需要在任务执行过程关注的点:
任务在什么线程执行?
任务以什么方式执行?
可以有多少个任务并发执行?
可以有多少任务进入等待队列?
如果任务过载,需要放弃任务,怎么办?
一个任务执行前后,应该做什么?
执行策略是对资源管理的工具,最佳策略取决于可用的计算资源和你对服务质量的要求。
2.4 线程池
线程池管理工作者线程,帮助我们管理工作线程的创建和销毁,工作队列的处理,可用让我们更加关注任务的编写上。
工作队列:其作用是持有所有等待执行的任务。
使用线程池好处:
重用存在的线程,而不是创建新的线程,这可以在处理多请求时抵消线程创建、消亡产生的开销。
在请求到达时,工作者线程通常已经存在,用于创建线程的等待时间并不会延迟任务的执行,因此提高了响应性。
通过适当地调整线程池的大小,你可以得到足够多的线程以保持处理器忙碌,同时可以还防止过多的线程相互竞争资源,导致应用程序耗尽内存或者失败。
类库Executors
提供了我们多种创建线程池的静态方法。
定长线程池 newCachedThreadPool()
1 2 3 4 5 6 7 8 9 10 11 public static ExecutorService newFixedThreadPool (int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>(), threadFactory); }
使用的工作队列是new LinkedBlockingQueue()
也就是工作队列是无限的,最好设置固定大小 。
可缓存线程池 newCachedThreadPool()
1 2 3 4 5 6 7 8 9 10 public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor (0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue <Runnable>()); }
单线程化的 executor newSingleThreadExecutor()
1 2 3 4 5 6 7 8 9 10 11 12 13 public static ExecutorService newSingleThreadExecutor () { return new Executors .FinalizableDelegatedExecutorService( new ThreadPoolExecutor (1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>() ) ); }
定时的线程池 newScheduledThreadPool()
1 2 3 4 5 6 7 8 9 10 11 12 13 public static ScheduledExecutorService newScheduledThreadPool (int corePoolSize) { return new ScheduledThreadPoolExecutor (corePoolSize); } public static ScheduledExecutorService newScheduledThreadPool (int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor (corePoolSize, threadFactory); }
2.5 Executor 的生命周期 Executor 本身并没有解决生命周期问题,它的子接口 ExecutorService
提供了一些接口用于解决这个问题:
shudown()
方法
shutdownNow()
方法
1 2 3 4 5 6 List<Runnable> shutdownNow () ;
isShutdown()
方法
isTerminated()
方法
1 2 3 4 5 boolean isTerminated () ;
awaitTermination()
方法
1 2 3 4 5 6 7 8 9 10 11 boolean awaitTermination (long timeout, TimeUnit unit) throws InterruptedException;
submit()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 <T> Future<T> submit (Callable<T> task) ; <T> Future<T> submit (Runnable task, T result) ; Future<?> submit(Runnable task);
invokeAll()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
invokeAny()
方法
1 2 3 4 5 <T> T invokeAny (Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
支持关闭操作的 Web Server 示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 import java.io.IOException;import java.net.ServerSocket;import java.net.Socket;import java.util.concurrent.ExecutorService;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.RejectedExecutionException;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.logging.Level;import java.util.logging.Logger;public class LifecycleWebServer { private int THREAD_NUM = 10 ; private final ExecutorService exec = new ThreadPoolExecutor (THREAD_NUM, THREAD_NUM, 0 , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <>(100 )); public void start () throws IOException { ServerSocket socket = new ServerSocket (80 ); while (!exec.isShutdown()) { try { final Socket conn = socket.accept(); exec.execute(new Runnable () { @Override public void run () { handleRequest(conn); } }); } catch (RejectedExecutionException e) { if (!exec.isShutdown()) { log("任务提交被拒绝" , e); } } } } public void stop () { exec.shutdown(); } private void log (String msg, Exception e) { Logger.getAnonymousLogger().log(Level.WARNING, msg, e); } void handleRequest (Socket connection) { Request req = readRequest(connection); if (isShutdownRequest(req)) { stop(); } else { dispatchRequest(req); } } interface Request { } private Request readRequest (Socket s) { return null ; } private void dispatchRequest (Request r) { } private boolean isShutdownRequest (Request r) { return false ; } }
该实例,利用 ExecutorService
提供的生命周期管理方法进行处理
2.6 延迟的、周期性的任务 在上面的 线程池
说明中,有描述到:定时的线程池 newScheduledThreadPool()
1 2 3 4 5 6 7 8 9 10 11 12 13 public static ScheduledExecutorService newScheduledThreadPool (int corePoolSize) { return new ScheduledThreadPoolExecutor (corePoolSize); } public static ScheduledExecutorService newScheduledThreadPool (int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor (corePoolSize, threadFactory); }
ScheduledExecutorService
接口继承了 ExecutorService
接口。其接口方法有:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);public <V> ScheduledFuture<V> schedule (Callable<V> callable, long delay, TimeUnit unit) ;public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
**对于延迟、周期任务可以考虑使用 ScheduledThreadPoolExecutor
**
3. 寻找可强化的并行性 3.1 寻找可强化的并行性
Executor 框架让制定一个执行策略变得简单。不过想要使用 Executor,我们还必须能够将任务描述为 Runnable。
在大多数服务端程序中,基本都存在这样一个明显的任务边界:单一的客户请求
为任务边界。
但是,很多客户端程序中,任务边界有时并非如此显而易见。
即使在服务端程序中,一个请求
任务,内部仍然会有可以进一步细化的并行性。
3.2 顺序执行的页面渲染器
顺序处理
处理 HTML 文档最简单的方法是顺序处理
。 当遇到一个文本标签,就将它渲染到图像缓存里;
当遇到一个图像的引用时,先通过网络获取它,然后也将它渲染到图像缓存里。
这样的顺序,可能会让用户等待很长时间,指导呈现出所有文本、图像
预留占位符
先渲染文本元素,并为图像预留出矩形的占位符
处理文本后,程序返回到开始,并下载图像,将它们绘制到相应的占位符上。
3.3 示例:顺序的渲染页面元素 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import java.util.ArrayList;import java.util.List;public abstract class SingleThreadRenderer { void renderPage (CharSequence source) { renderText(source); List<ImageData> imageData = new ArrayList <>(); for (ImageInfo imageInfo : scanForImageInfo(source)) { imageData.add(imageInfo.downloadImage()); } for (ImageData data : imageData) { renderImage(data); } } interface ImageData { } interface ImageInfo { ImageData downloadImage () ; } abstract void renderText (CharSequence s) ; abstract List<ImageInfo> scanForImageInfo (CharSequence s) ; abstract void renderImage (ImageData i) ; }
3.4 可携带结果的任务:Callable
和 Future
我们知道 Callable 接口有返回值,Runnable 接口没有返回值。
我们可以将 Runnable
或 Callable
提交给 Executor
,然后得到一个 Future
,用得到的 Future<T>
来获得任务执行的结果,或者取消任务。
1 public interface ExecutorService extends Executor { }
也可以将 Runnable
实例化一个 FutureTask
【如下,FutureTask 实现了 Runnable】
1 2 3 public class FutureTask <V> implements RunnableFuture <V> { }public interface RunnableFuture <V> extends Runnable , Future<V> { }
FutureTask
既可以提交给 Executor
来执行,又可以直接调用 run()
方法运行。
Runnable 或其分支,将其提交给 Executor
执行,Future.get()
的值可以指定,无指定默认为 null
1 2 3 <T> Future<T> submit (Runnable task, T result) ; Future<?> submit(Runnable task);
将 Callable 提交给 Executor
执行,Future.get()
的值为 Callable 执行的返回值
3.5 示例:使用 Future 等待图像下载 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 import java.util.ArrayList;import java.util.List;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;public abstract class FutureRenderer { private final ExecutorService executor = Executors.newCachedThreadPool(); void renderPage (CharSequence source) { final List<ImageInfo> imageInfos = scanForImageInfo(source); Callable<List<ImageData>> task = new Callable <List<ImageData>>() { @Override public List<ImageData> call () { List<ImageData> result = new ArrayList <>(); for (ImageInfo imageInfo : imageInfos) { result.add(imageInfo.downloadImage()); } return result; } }; Future<List<ImageData>> future = executor.submit(task); renderText(source); try { List<ImageData> imageData = future.get(); for (ImageData data : imageData) { renderImage(data); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); future.cancel(true ); } catch (ExecutionException e) { throw LaunderThrowable.launderThrowable(e.getCause()); } } interface ImageData { } interface ImageInfo { ImageData downloadImage () ; } abstract void renderText (CharSequence s) ; abstract List<ImageInfo> scanForImageInfo (CharSequence s) ; abstract void renderImage (ImageData i) ; } class LaunderThrowable { public static RuntimeException launderThrowable (Throwable throwable) { if (throwable instanceof RuntimeException) { return (RuntimeException) throwable; } else if (throwable instanceof Error) { throw (Error) throwable; } else { throw new IllegalStateException ("Not unchecked" , throwable); } } }
3.6 Executor
和 BlockingQueue
功能的整合 —- CompletionService CompletionService
整合了 Executor
和 BlockingQueue
的功能。下面将从 CompletionService
实现上分析其功能。
CompletionService 将新异步任务的生产
与已完成任务的结果消耗
相分离
1 2 3 4 5 6 7 8 9 10 11 public interface CompletionService <V> { Future<V> submit (Callable<V> task) ; Future<V> submit (Runnable task, V result) ; Future<V> take () throws InterruptedException; Future<V> poll () ; Future<V> poll (long timeout, TimeUnit unit) throws InterruptedException; }
回顾一下 BlockingQueue
1 2 3 4 1. 阻寨队列(Blocking queue)提供了可阻塞的 put 和 take 方法 2. 如果 Queue 己经满了,put 方法会被阻塞,直到有空间可用 3. 如果 Queue是空的,那么 take 方法会被阻塞,直到有元素可用 4. Queue 的长度可以有限,也可以无限,无限的 Queue 永远不会充满,所以它的 put 方法永远不会阻塞
ExecutorCompletionservice
是实现 CompletionService
接口的一个类,并将 计算任务 委托给一个 Executor
。
1 2 3 4 5 6 7 public class ExecutorCompletionService <V> implements CompletionService <V> { private final Executor executor; private final BlockingQueue<Future<V>> completionQueue; }
ExecutorCompletionservice 的构造函数中创建了一个 BlockingQueue
,用来保存完成的结果
。
1 2 3 4 5 6 7 8 9 public ExecutorCompletionService (Executor executor) { this .completionQueue = new LinkedBlockingQueue <Future<V>>(); } public ExecutorCompletionService (Executor executor, BlockingQueue<Future<V>> completionQueue) { this .completionQueue = completionQueue; }
在 ExecutorCompletionservice
中有一个内部类:QueueingFuture
1 2 3 4 5 6 7 8 9 10 11 private class QueueingFuture extends FutureTask <Void> { QueueingFuture(RunnableFuture<V> task) { super (task, null ); this .task = task; } protected void done () { completionQueue.add(task); } private final Future<V> task; }
当向 ExecutorCompletionservice
提交了一个任务后,首先把这个任务包装为一个 QueueingFuture
1 2 3 4 5 6 7 8 public Future<V> submit (Callable<V> task) { if (task == null ) throw new NullPointerException (); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture (f)); return f; }
new QueueingFuture(f)
—- 将 task
包装为一个 QueueingFuture。
提交一个 task :
如上,我们提交一个 task
,该 task
会被包装为一个 QueueingFuture
QueueingFuture 是 FutureTask
的子类,且 QueueingFuture 重写了父类的 done() 方法。
1 2 3 4 5 6 protected void done () { completionQueue.add(task); }
task 执行完成时,都会调用 FutureTask 中 done 方法
调用 done 方法,会将执行完 task 后的结果加入到阻塞队列中。
ExecutorCompletionservice
中的 take()
、poll()
方法
1 2 3 4 5 6 7 8 9 10 11 12 public Future<V> take () throws InterruptedException { return completionQueue.take(); } public Future<V> poll () { return completionQueue.poll(); } public Future<V> poll (long timeout, TimeUnit unit) throws InterruptedException { return completionQueue.poll(timeout, unit); }
如上 take()
和poll()
方法,实际上是将获取结果委托给了阻塞队列
。
在阻塞队列中,如果队列满了,put 方法会被阻塞,直到有空间可用。
如果队列是空的,那么 take 方法会被阻塞,直到有元素可用。
3.7 示例:使用 CompletionService 的页面渲染器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 import java.util.List;import java.util.concurrent.Callable;import java.util.concurrent.CompletionService;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorCompletionService;import java.util.concurrent.ExecutorService;import java.util.concurrent.Future;public abstract class Renderer { private final ExecutorService executor; Renderer(ExecutorService executor) { this .executor = executor; } void renderPage (CharSequence source) { final List<ImageInfo> info = scanForImageInfo(source); CompletionService<ImageData> completionService = new ExecutorCompletionService <>(executor); for (final ImageInfo imageInfo : info) { completionService.submit(new Callable <ImageData>() { @Override public ImageData call () { return imageInfo.downloadImage(); } }); } renderText(source); try { for (int t = 0 , n = info.size(); t < n; t++) { Future<ImageData> f = completionService.take(); ImageData imageData = f.get(); renderImage(imageData); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { throw LaunderThrowable.launderThrowable(e.getCause()); } } interface ImageData { } interface ImageInfo { ImageData downloadImage () ; } abstract void renderText (CharSequence s) ; abstract List<ImageInfo> scanForImageInfo (CharSequence s) ; abstract void renderImage (ImageData i) ; static class LaunderThrowable { public static RuntimeException launderThrowable (Throwable throwable) { if (throwable instanceof RuntimeException) { return (RuntimeException) throwable; } else if (throwable instanceof Error) { throw (Error) throwable; } else { throw new IllegalStateException ("Not unchecked" , throwable); } } } }
3.8 为任务设置时限
3.9 示例:旅游预订门户网站 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.TimeoutException;import static java.util.concurrent.TimeUnit.NANOSECONDS;public class RenderWithTimeBudget { private static final Ad DEFAULT_AD = new Ad (); private static final long TIME_BUDGET = 1000 ; private static final ExecutorService exec = Executors.newCachedThreadPool(); Page renderPageWithAd () throws InterruptedException { long endNanos = System.nanoTime() + TIME_BUDGET; Future<Ad> adFuture = exec.submit(new FetchAdTask ()); Page page = renderPageBody(); Ad ad; try { long timeLeft = endNanos - System.nanoTime(); ad = adFuture.get(timeLeft, NANOSECONDS); } catch (ExecutionException e) { ad = DEFAULT_AD; } catch (TimeoutException e) { ad = DEFAULT_AD; adFuture.cancel(true ); } page.setAd(ad); return page; } Page renderPageBody () { return new Page (); } static class Ad { } static class Page { public void setAd (Ad ad) { } } static class FetchAdTask implements Callable <Ad> { @Override public Ad call () { return new Ad (); } } }
3.10 示例:在预订时间内请求旅游报价 ExecutorService.invokeAll()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
Future
的 isDone() 方法和:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 boolean isDone () ;boolean cancel (boolean mayInterruptIfRunning) ;
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 import java.util.ArrayList;import java.util.Collections;import java.util.Comparator;import java.util.Iterator;import java.util.List;import java.util.Set;import java.util.concurrent.Callable;import java.util.concurrent.CancellationException;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.TimeUnit;public class TimeBudget { private static ExecutorService exec = Executors.newCachedThreadPool(); public List<TravelQuote> getRankedTravelQuotes (TravelInfo travelInfo, Set<TravelCompany> companies, Comparator<TravelQuote> ranking, long time, TimeUnit unit) throws InterruptedException { List<QuoteTask> tasks = new ArrayList <>(); for (TravelCompany company : companies) { tasks.add(new QuoteTask (company, travelInfo)); } List<Future<TravelQuote>> quoteFutures = exec.invokeAll(tasks, time, unit); List<TravelQuote> quotes = new ArrayList <>(tasks.size()); Iterator<QuoteTask> taskIter = tasks.iterator(); for (Future<TravelQuote> quoteFuture : quoteFutures) { QuoteTask task = taskIter.next(); try { quotes.add(quoteFuture.get()); } catch (ExecutionException e) { quotes.add(task.getFailureQuote(e.getCause())); } catch (CancellationException e) { quotes.add(task.getTimeoutQuote(e)); } } Collections.sort(quotes, ranking); return quotes; } } class QuoteTask implements Callable <TravelQuote> { private final TravelCompany company; private final TravelInfo travelInfo; public QuoteTask (TravelCompany company, TravelInfo travelInfo) { this .company = company; this .travelInfo = travelInfo; } TravelQuote getFailureQuote (Throwable t) { return null ; } TravelQuote getTimeoutQuote (CancellationException e) { return null ; } @Override public TravelQuote call () throws Exception { return company.solicitQuote(travelInfo); } } interface TravelCompany { TravelQuote solicitQuote (TravelInfo travelInfo) throws Exception; } interface TravelQuote {} interface TravelInfo {}