1. 任务取消 1.1 任务取消 外部代码,能够将某个操作正常完成之前,将其置入完成状态,那么这个操作就称为可取消的(Cancellable) 。
取消操作的原因有很多:
用户请求取消。
有时间限制的操作,如超时设定。
应用程序事件。
错误。
关闭。
1.2 示例:使用 volatile 域,保存取消状态 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 import net.jcip.annotations.GuardedBy;import net.jcip.annotations.ThreadSafe;import java.math.BigInteger;import java.util.ArrayList;import java.util.List;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import static java.util.concurrent.TimeUnit.SECONDS;@ThreadSafe public class PrimeGenerator implements Runnable { private static ExecutorService exec = Executors.newCachedThreadPool(); @GuardedBy("this") private final List<BigInteger> primes = new ArrayList <>(); private volatile boolean cancelled; @Override public void run () { BigInteger prime = BigInteger.ONE; while (!cancelled) { prime = prime.nextProbablePrime(); synchronized (this ) { primes.add(prime); } } } public void cancel () { cancelled = true ; } public synchronized List<BigInteger> get () { return new ArrayList <BigInteger>(primes); } static List<BigInteger> aSecondOfPrimes () throws InterruptedException { PrimeGenerator generator = new PrimeGenerator (); exec.execute(generator); try { SECONDS.sleep(1 ); } finally { generator.cancel(); } return generator.get(); } }
Primecenerator 使用了简单的取消策略:客户端代码通过调用 cancel 请求取消,Primecenerator 每次搜索素数前检查是否有取消请求,当发现取消请求时就退出。
1.3 中断
线程中断
是一种协作机制,线程可以通过这种机制来通知另一个线程,告诉它在某些情况下停止当前工作,并转而执行其他的工作。
每个线程都有一个 boolean 类型的中断状态。当中断线程时,这个线程的中断状态将被设置为 true。
线程的中断方法:
1 2 3 4 5 6 7 8 9 public class Thread { public void interrupt () { ... } public boolean isInterrupted () { ... } public static boolean interrupted () { ... } ...... }
调用 interrupt 并不意味着立即停止目标线程正在进行的工作,而只是传递了请求中断的消息。
中断操作,并不会真正的中断一个正在运行的线程,只是发出了中断请求,线程会在一个合适的时刻中断自己。
1.4 中断策略
1.5 响应中断 在调用可中断的阻塞函数时,有两种实用策略可以处理 InterruptedException
中断异常:
1.6 示例:计时运行 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class TimeRun { private static final Integer CORE_POOL_SIZE = 100 ; private static final ThreadFactory THREAD_FACTORY = Executors.defaultThreadFactory(); private static final ScheduledExecutorService EXEC = new ScheduledThreadPoolExecutor (CORE_POOL_SIZE, THREAD_FACTORY); public static void timedRun (Runnable runnable, long timeout, TimeUnit unit) { final Thread task = Thread.currentThread(); ScheduledFuture<?> schedule = EXEC.schedule(task::interrupt, timeout, unit); EXEC.execute(runnable); } }
1.7 通过 Future 取消任务 Future<V>
可以用来和已经提交的任务进行交互。Future<V>
接口定义了如下几个方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public interface Future <V> { boolean cancel (boolean mayInterruptIfRunning) ; boolean isCancelled () ; boolean isDone () ; V get () throws InterruptedException, ExecutionException; V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 import java.util.concurrent.Callable;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;public class FutureTest01 { public static void main (String[] args) throws Exception { ExecutorService threadPool = Executors.newSingleThreadExecutor(); SimpleTask task = new SimpleTask (3_000 ); Future<Double> future = threadPool.submit(task); threadPool.shutdown(); double time = future.get(); System.out.format("任务运行时间: %.3f s\n" , time); } private static final class SimpleTask implements Callable <Double> { private final int sleepTime; public SimpleTask (int sleepTime) { this .sleepTime = sleepTime; } @Override public Double call () throws Exception { double begin = System.nanoTime(); Thread.sleep(sleepTime); double end = System.nanoTime(); double time = (end - begin) / 1E9 ; return time; } } }
如上,创建了一个 SimpleTask
,该 task
运行时会休眠 3000ms,运行总时间会大于3000ms。
测试结果:
在上例基础上,通过 Future.cancel()
取消任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 import java.util.concurrent.Callable;import java.util.concurrent.CancellationException;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;public class FutureTest02 { public static void main (String[] args) { ExecutorService threadPool = Executors.newSingleThreadExecutor(); SimpleTask task = new SimpleTask (3_000 ); Future<Double> future = threadPool.submit(task); threadPool.shutdown(); cancelTask(future, 2_000 ); try { double time = future.get(); System.out.format("任务运行时间: %.3f s\n" , time); } catch (CancellationException ex) { System.err.println("任务被取消" ); } catch (InterruptedException ex) { System.err.println("当前线程被中断" ); } catch (ExecutionException ex) { System.err.println("任务执行出错" ); } } private static void cancelTask (final Future<?> future, final int delay) { Runnable cancellation = () -> { try { Thread.sleep(delay); future.cancel(true ); } catch (InterruptedException ex) { ex.printStackTrace(System.err); } }; new Thread (cancellation).start(); } private static final class SimpleTask implements Callable <Double> { private final int sleepTime; public SimpleTask (int sleepTime) { this .sleepTime = sleepTime; } @Override public Double call () throws Exception { double begin = System.nanoTime(); Thread.sleep(sleepTime); double end = System.nanoTime(); double time = (end - begin) / 1E9 ; return time; } } }
这里 task 的运行总时间会大于3000ms的,不过在 2 秒之后取消该任务。运行结果:
1.8 调用 Future
的 cancel(true)
不一定能取消正在运行的任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 import java.util.concurrent.Callable;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;public class FutureTest03 { public static void main (String[] args) throws Exception { ExecutorService threadPool = Executors.newSingleThreadExecutor(); long num = 1000000033L ; PrimerTask task = new PrimerTask (num); Future<Boolean> future = threadPool.submit(task); threadPool.shutdown(); boolean result = future.get(); System.out.format("%d 是否为素数? %b\n" , num, result); } private static final class PrimerTask implements Callable <Boolean> { private final long num; public PrimerTask (long num) { this .num = num; } @Override public Boolean call () { double begin = System.nanoTime(); for (long i = 2 ; i < num; i++) { if (num % i == 0 ) { return false ; } } double end = System.nanoTime(); double time = (end - begin) / 1E9 ; System.out.format("任务运行时间: %.3f s\n" , time); return true ; } } }
Output:
1 2 任务运行时间: 15.033 s 1000000033 是否为素数? true
任务的运行时间,大约是15.033 s
在任务运行到 2 秒的时候调用 Future
的 cancel(true)
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 import java.util.concurrent.Callable;import java.util.concurrent.CancellationException;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;public class FutureTest04 { public static void main (String[] args) throws Exception { ExecutorService threadPool = Executors.newSingleThreadExecutor(); long num = 1000000033L ; PrimerTask task = new PrimerTask (num); Future<Boolean> future = threadPool.submit(task); threadPool.shutdown(); cancelTask(future, 2_000 ); try { boolean result = future.get(); System.out.format("%d 是否为素数? %b\n" , num, result); } catch (CancellationException ex) { System.err.println("任务被取消" ); } catch (InterruptedException ex) { System.err.println("当前线程被中断" ); } catch (ExecutionException ex) { System.err.println("任务执行出错" ); } } private static final class PrimerTask implements Callable <Boolean> { private final long num; public PrimerTask (long num) { this .num = num; } @Override public Boolean call () { double begin = System.nanoTime(); for (long i = 2 ; i < num; i++) { if (num % i == 0 ) { return false ; } } double end = System.nanoTime(); double time = (end - begin) / 1E9 ; System.out.format("任务运行时间: %.3f s\n" , time); return true ; } } private static void cancelTask (final Future<?> future, final int delay) { Runnable cancellation = () -> { try { Thread.sleep(delay); future.cancel(true ); } catch (InterruptedException ex) { ex.printStackTrace(System.err); } }; new Thread (cancellation).start(); } }
Output:
可以发现,虽然我们取消了任务,Future
的 get
方法也对我们的取消做出了响应(即抛出 CancellationException
异常),但是任务并没有停止,而是直到任务运行完毕了,程序才结束。
原因:
如上代码 Future
的 get
方法对我们的取消做出了响应:
1 2 3 catch (CancellationException ex) { System.err.println("任务被取消" ); }
但是任务并没有停止,而是直到任务运行完毕了,程序才结束。
查看一下 FutureTask.cancel()
方法源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public boolean cancel (boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this , stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false ; try { if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null ) t.interrupt(); } finally { UNSAFE.putOrderedInt(this , stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true ; }
Unsafe.class
中的 compareAndSwapInt()
方法和 putOrderedInt()
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public final class Unsafe { public final native boolean compareAndSwapInt (Object var1, long var2, int var4, int var5) ; public native void putOrderedInt (Object var1, long var2, int var4) ; }
从 FutureTask.cancel()
源码可以知道,在运行状态下的任务,如果我们调用 cancel()
方法,传入 true
为参数,那么接下来会调用 interrupt()
方法将线程状态标记为中断状态。
将线程状态标记为中断状态,但不会终止线程,线程还会继续执行。
1.9 对线程中断做出响应 的任务 如上 FutureTest04
,这里对线程中断作出响应操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private static final class PrimerTask implements Callable <Boolean> { private final long num; public PrimerTask (long num) { this .num = num; } @Override public Boolean call () { double begin = System.nanoTime(); for (long i = 2 ; i < num; i++) { if (Thread.currentThread().isInterrupted()) { System.out.println("PrimerTask.call() task 被取消 ------- i = " + i); double end = System.nanoTime(); double time = (end - begin) / 1E9 ; System.out.format("任务运行时间: %.3f s\n" , time); return false ; } if (num % i == 0 ) { return false ; } } double end = System.nanoTime(); double time = (end - begin) / 1E9 ; System.out.format("任务运行时间: %.3f s\n" , time); return true ; } }
Output:
1 2 3 PrimerTask.call() task 被取消 ------- i = 154513461 任务运行时间: 2.151 s 任务被取消
通过 Thread.currentThread().isInterrupted()
方法,我们可以判断任务是否被取消,从而做出相应的取消任务的响应。
1.10 处理不可中断阻塞
并非所有的可阻塞方法或者阻塞机制都能响应中断。如果一个线程由于 执行同步的Socket I/O
或者 等待获得内置锁
而阻塞,那么中断请求只能设置线程的中断状态,除此之外没有其他任何作用。
由于执行不可中断操作而被阻塞的线程,可以使用类似于中断的手段来停止这些线程,但这要求我们必须知道线程阻塞的原因:
java.io 包中的同步Socket I/O 。
InputStream.read() 和 OutputStream.write() 等方法都不会响应中断,通过关闭底层的套接字,可以使得由于执行这些方法的线程被阻塞,抛出一个SocketException。
java.nio 包中的同步I/O 。
当中断一个正在 InterruptibleChannel 上等待的线程时,将抛出 ClosedByInterruptException 并关闭链路。
当关闭一个 InterruptibleChannel 时,将导致所有在链路操作上阻塞的线程都抛出 AsynchronousCloseException。
大多数的 Channel 都实现了 InterruptibleChannel。
Selector 的异步I/O 。
如果一个线程在调用 Selector.select() 方法时阻塞了,那么调用 close() 或 wakeup() 方法会使线程抛出ClosedSelectorException 并提前返回。
获得锁 。
如果一个线程由于等待某个内置锁而阻塞,那么将无法响应中断。在 Lock 类中提供了 lockInterruptibly() 方法,该方法允许在等待一个锁的同时仍能响应中断。
1.11 如何封装非标准的取消操作 1.11.1 覆写 interrupt 来封裝非标准取消 在 Thread 中,通过覆写 interrupt 来封裝非标准取消
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 import java.io.IOException;import java.io.InputStream;import java.net.Socket;public class ReaderThread extends Thread { private static final int BUFSZ = 512 ; private final Socket socket; private final InputStream in; public ReaderThread (Socket socket) throws IOException { this .socket = socket; this .in = socket.getInputStream(); } public void interrupt () { try { socket.close(); } catch (IOException ignored) { } finally { super .interrupt(); } } public void run () { try { byte [] buf = new byte [BUFSZ]; while (true ) { int count = in.read(buf); if (count < 0 ) { break ; } else if (count > 0 ) { processBuffer(buf, count); } } } catch (IOException e) { } } public void processBuffer (byte [] buf, int count) { } }
1.11.2 用 newTaskFor 封装非标准的取消
newTaskFor 是一个工厂方法,它将创建 Future 来代表任务。
newTaskFor 还能返回一个 RunnableFuture 接口,该接口拓展了 Future 和 Runnable (并由FutureTask实现)。
当把一个 Callable 提交给 ExecutorService 时,submit 方法会返回一个 Future,我们可以通过这个 Future 来取消任务。
通过定制表示任务的 Future 可以改变 Future.cancel 的行为。
使用 newTaskFor 封装非标准的取消:
2. 停止基于线程的服务 2.1 停止基于线程的服务
应用程序通常会创建拥有线程的服务,比如线程池,这些服务的存在时间通常比创建它们的方法存在的时间更长。
线程通过一个 Thread 对象表示,像其它对象一样,线程可以被自由的共享。线程API中并没有关于线程所属权正规的概念。
如果应用程序优雅地退出,这些服务的线程也需要结束。因为没有退出线程惯用的优先方法,它们需要自行结束。
通常我们使用线程池来创建线程和关闭线程。线程池即为线程的拥有者,操作线程、改变优先级等都由线程池来负责。
服务应该提供生命周期方法来关闭自己,并关闭它所拥有的线程。当我们关闭应用程序这个服务时,服务就可以关闭所有的线程了。
ExecutorService 提供了 shutdown()、shutdownNow() 方法,其它持有线程的服务,也应该提供类似的关闭机制。
对于持有线程的服务,主要服务的存在时间大于创建线程的方法的存在时间,那么就应该提供生命周期方法。
2.2 示例:日志服务
不支持关闭的生产者-消费者日志服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 import java.io.PrintWriter;import java.io.Writer;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;public class LogWriter { private final BlockingQueue<String> queue; private final LoggerThread logger; private static final int CAPACITY = 1000 ; public LogWriter (Writer writer) { this .queue = new LinkedBlockingQueue <String>(CAPACITY); this .logger = new LoggerThread (writer); } public void start () { logger.start(); } public void log (String msg) throws InterruptedException { queue.put(msg); } private class LoggerThread extends Thread { private final PrintWriter writer; public LoggerThread (Writer writer) { this .writer = new PrintWriter (writer, true ); } public void run () { try { while (true ) { writer.println(queue.take()); } } catch (InterruptedException ignored) { } finally { writer.close(); } } } }
如上 LogWriter 为一个生产者-消费者模型,日志活动被分离到一个单独的日志线程中。产生消息的线程不会将消息直接写入输出流,而是由 LogWriter 通过 BlockingQueue 把这个任务提交给日志线程,并由日志线程写入。
在正常情况下,如何关闭日志线程?
取消一个生产者-消费者活动,既要取消生产者,又要取消消费者。
中断日志线程,应着手处理消费者,但是如果消费者和生产者不在同一个线程(如上LogWriter),生产者线程实现一个“检查在运行”
生产者观察是否被关闭服务,
消费者正常消费消息,如若队列已空(被阻塞),检查是否关闭了服务。
如果关闭了服务,关闭消费者进程,然后关闭生产者进程
如果未关闭服务,继续等待,直到有消息入队
对于生产者而已,可能会存在,队列已满,消息被阻塞,无法入队。
向日志服务添加不可靠的关闭支持
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 import java.io.PrintWriter;import java.io.Writer;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;public class LogWriter2 { private final BlockingQueue<String> queue; private final LoggerThread2 logger; private static final int CAPACITY = 1000 ; private boolean isShutdownRequested = false ; public LogWriter2 (Writer writer) { this .queue = new LinkedBlockingQueue <String>(CAPACITY); this .logger = new LoggerThread2 (writer); } public void start () { logger.start(); } public void log (String msg) throws InterruptedException { queue.put(msg); if (!isShutdownRequested) { queue.put(msg); } else { throw new IllegalStateException ("logger is shut down" ); } } public void setShutdownStatus (boolean status) { this .isShutdownRequested = status; } private class LoggerThread2 extends Thread { private final PrintWriter writer; public LoggerThread2 (Writer writer) { this .writer = new PrintWriter (writer, true ); } public void run () { try { while (true ) { writer.println(queue.take()); } } catch (InterruptedException ignored) { } finally { writer.close(); } } } }
这些不能解决最基本的问题,可能会导致失败,创建新日志消息的各个子任务都必须是原子操作。
向 LogWriter 中添加可靠的取消
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 import net.jcip.annotations.GuardedBy;import java.io.PrintWriter;import java.io.Writer;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;public class LogService { private final BlockingQueue<String> queue; private final LoggerThread loggerThread; private final PrintWriter writer; @GuardedBy("this") private boolean isShutdown; @GuardedBy("this") private int reservations; public LogService (Writer writer) { this .queue = new LinkedBlockingQueue <String>(); this .loggerThread = new LoggerThread (); this .writer = new PrintWriter (writer); } public void start () { loggerThread.start(); } public void stop () { synchronized (this ) { isShutdown = true ; } loggerThread.interrupt(); } public void log (String msg) throws InterruptedException { synchronized (this ) { if (isShutdown) { throw new IllegalStateException ("LogService Is Shut Down" ); } ++ reservations; } queue.put(msg); } private class LoggerThread extends Thread { public void run () { try { while (true ) { try { synchronized (LogService.this ) { if (!isShutdown || reservations != 0 ) { } else { break ; } } String msg = queue.take(); synchronized (LogService.this ) { --reservations; } writer.println(msg); } catch (InterruptedException e) { } } } finally { writer.close(); } } } }
2.3 关闭 ExecutorService 在 ExecutorService 中提供了 shutdown()、shutdownNow() 方法对其进行关闭:
1 2 3 4 5 6 7 8 9 10 11 12 13 void shutdown () ;List<Runnable> shutdownNow () ;
使用 ExcutorService 的日志服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 import java.io.PrintWriter;import java.io.Writer;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.RejectedExecutionException;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class LogService2 { private int THREAD_NUM = 10 ; private final ExecutorService exec = new ThreadPoolExecutor (THREAD_NUM, THREAD_NUM, 0 , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <>(CAPACITY)); private final long TIMEOUT = 10000 ; private final TimeUnit UNIT = TimeUnit.MILLISECONDS; private final WriteTask writer; private final BlockingQueue<String> queue; private static final int CAPACITY = 1000 ; public LogService2 (Writer writer) { this .writer = new WriteTask (writer); this .queue = new LinkedBlockingQueue <String>(CAPACITY); } public void start () {} public void stop () throws InterruptedException { try { exec.shutdown(); exec.awaitTermination(TIMEOUT, UNIT); } finally { writer.close(); } } public void log (String msg) { try { queue.put(msg); exec.execute(writer); } catch (RejectedExecutionException ignored) { } catch (InterruptedException e) { throw new RuntimeException (e); } } private class WriteTask implements Runnable { private final PrintWriter writer; public WriteTask (Writer writer) { this .writer = (PrintWriter) writer; } public void close () { this .writer.close(); } @Override public void run () { try { while (true ) { writer.println(LogService2.this .queue.take()); } } catch (InterruptedException ignored) { } finally { writer.close(); } } } }
2.4 致命药丸 致命药丸:一个置于队列中,可识别的对象。
我们可以使用 致命药丸
来关闭生产者-消费者服务。
使用致命药丸来关闭
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 import java.io.File;import java.io.FileFilter;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;public class IndexingService { private static final int CAPACITY = 1000 ; private static final File POISON = new File ("" ); private final IndexerThread consumer = new IndexerThread (); private final CrawlerThread producer = new CrawlerThread (); private final BlockingQueue<File> queue; private final FileFilter fileFilter; private final File root; public IndexingService (File root, final FileFilter fileFilter) { this .root = root; this .queue = new LinkedBlockingQueue <File>(CAPACITY); this .fileFilter = new FileFilter () { public boolean accept (File pathname) { return pathname.isDirectory() || fileFilter.accept(pathname); } }; } private boolean alreadyIndexed (File f) { return false ; } public void start () { producer.start(); consumer.start(); } public void stop () { producer.interrupt(); } public void awaitTermination () throws InterruptedException { consumer.join(); } class CrawlerThread extends Thread { public void run () { try { crawl(root); } catch (InterruptedException e) { } finally { while (true ) { try { queue.put(POISON); break ; } catch (InterruptedException e1) {} } } } private void crawl (File root) throws InterruptedException { File[] entries = root.listFiles(fileFilter); if (entries != null ) { for (File entry : entries) { if (entry.isDirectory()) { crawl(entry); } else if (!alreadyIndexed(entry)) { queue.put(entry); } } } } } class IndexerThread extends Thread { public void run () { try { while (true ) { File file = queue.take(); if (file == POISON) break ; else indexFile(file); } } catch (InterruptedException consumed) { } } public void indexFile (File file) { }; } }
2.5 示例:只执行一次的服务 如果一个方法需要处理一批任务,并在所有任务结束前不会返回,那么它可以通过使用私有的 Executor 来简化服务的生命周期管理,其中 Executor 的寿命限定在该方法中 (在这种情况下,通常会用到 invokeAll
和 invokeAny
方法)。
invokeAll()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
invokeAny()
方法
1 2 3 4 5 <T> T invokeAny (Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
在多台主机上并行的检查新邮件
使用私有 Executor,将它的寿命限定于一次方法调用中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import java.util.Set;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicBoolean;public class CheckForMail { public boolean checkMail (Set<String> hosts, long timeout, TimeUnit unit) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); final AtomicBoolean hasNewMail = new AtomicBoolean (false ); try { for (final String host : hosts) { exec.execute(new Runnable () { public void run () { if (checkMail(host)) { hasNewMail.set(true ); } } }); } } finally { exec.shutdown(); exec.awaitTermination(timeout, unit); } return hasNewMail.get(); } private boolean checkMail (String host) { return false ; } }
2.6 shutdownNow() 的局限性 1 2 3 4 5 6 List<Runnable> shutdownNow () ;
当通过 shutdownNow
来强行关闭 ExecutorService
时,它会尝试取消正在运行的任务
并返回所有已提交但尚未开始的任务
。
我们无法通过常规方法来找出哪些任务已经开始但尚未结束
。这意味着我们无法在关闭过程中知道正在执行的任务的状态,除非任务本身会执行某种检查。
关闭之后,ExecutorService 获取被取消的任务
TrackingExecutor
中给出了如何在关闭过程中判断正在执行的任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 import java.util.ArrayList;import java.util.Collections;import java.util.HashSet;import java.util.List;import java.util.Set;import java.util.concurrent.AbstractExecutorService;import java.util.concurrent.ExecutorService;import java.util.concurrent.TimeUnit;public class TrackingExecutor extends AbstractExecutorService { private final ExecutorService exec; private final Set<Runnable> tasksCancelledAtShutdown = Collections.synchronizedSet(new HashSet <Runnable>()); public TrackingExecutor (ExecutorService exec) { this .exec = exec; } public void shutdown () { exec.shutdown(); } public List<Runnable> shutdownNow () { return exec.shutdownNow(); } public boolean isShutdown () { return exec.isShutdown(); } public boolean isTerminated () { return exec.isTerminated(); } public boolean awaitTermination (long timeout, TimeUnit unit) throws InterruptedException { return exec.awaitTermination(timeout, unit); } public List<Runnable> getCancelledTasks () { if (!exec.isTerminated()) { throw new IllegalStateException ("线程池未关闭" ); } return new ArrayList <Runnable>(tasksCancelledAtShutdown); } public void execute (final Runnable runnable) { exec.execute(new Runnable () { public void run () { try { runnable.run(); } finally { if (isShutdown() && Thread.currentThread().isInterrupted()) { tasksCancelledAtShutdown.add(runnable); } } } }); } }
使用 TrackingExecutorService 为后续执行来保存未完成的任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 import net.jcip.annotations.GuardedBy;import java.net.URL;import java.util.ArrayList;import java.util.Collections;import java.util.HashSet;import java.util.List;import java.util.Set;import java.util.concurrent.AbstractExecutorService;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ConcurrentMap;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import static java.util.concurrent.TimeUnit.MILLISECONDS;public abstract class WebCrawler { private volatile TrackingExecutor exec; @GuardedBy("this") private final Set<URL> urlsToCrawl = new HashSet <URL>(); private final ConcurrentMap<URL, Boolean> seen = new ConcurrentHashMap <>(); private static final long TIMEOUT = 500 ; private static final TimeUnit UNIT = MILLISECONDS; public WebCrawler (URL startUrl) { urlsToCrawl.add(startUrl); } public synchronized void start () { exec = new TrackingExecutor (Executors.newCachedThreadPool()); for (URL url : urlsToCrawl) { submitCrawlTask(url); } urlsToCrawl.clear(); } public synchronized void stop () throws InterruptedException { try { saveUnCrawled(exec.shutdownNow()); if (exec.awaitTermination(TIMEOUT, UNIT)) { saveUnCrawled(exec.getCancelledTasks()); } } finally { exec = null ; } } private void saveUnCrawled (List<Runnable> unCrawled) { for (Runnable task : unCrawled) { urlsToCrawl.add(((CrawlTask) task).getPage()); } } private void submitCrawlTask (URL u) { exec.execute(new CrawlTask (u)); } private class CrawlTask implements Runnable { private final URL url; CrawlTask(URL url) { this .url = url; } private int count = 1 ; boolean alreadyCrawled () { return seen.putIfAbsent(url, true ) != null ; } void markUnCrawled () { seen.remove(url); System.out.printf("marking %s unCrawled%n" , url); } public void run () { for (URL link : processPage(url)) { if (Thread.currentThread().isInterrupted()) { return ; } submitCrawlTask(link); } } public URL getPage () { return url; } } protected abstract List<URL> processPage (URL url) ; private class TrackingExecutor extends AbstractExecutorService { private final ExecutorService exec; private final Set<Runnable> tasksCancelledAtShutdown = Collections.synchronizedSet(new HashSet <Runnable>()); public TrackingExecutor (ExecutorService exec) { this .exec = exec; } public void shutdown () { exec.shutdown(); } public List<Runnable> shutdownNow () { return exec.shutdownNow(); } public boolean isShutdown () { return exec.isShutdown(); } public boolean isTerminated () { return exec.isTerminated(); } public boolean awaitTermination (long timeout, TimeUnit unit) throws InterruptedException { return exec.awaitTermination(timeout, unit); } public List<Runnable> getCancelledTasks () { if (!exec.isTerminated()) { throw new IllegalStateException ("线程池未关闭" ); } return new ArrayList <Runnable>(tasksCancelledAtShutdown); } public void execute (final Runnable runnable) { exec.execute(new Runnable () { public void run () { try { runnable.run(); } finally { if (isShutdown() && Thread.currentThread().isInterrupted()) { tasksCancelledAtShutdown.add(runnable); } } } }); } } }
3. 处理反常的线程终止 3.1 处理反常的线程终止
当并发程序中的某个线程发生故障使控制台中可能会输出栈追踪信息,但是没有人会观察控制台。
此外,当线程发生故障时,应用程序可能看起来仍然在工作,所以这个失败很可能被忽略。
幸运的是,我们有可以监测并防止程序中“遗漏”线程的方法。
导致线程死亡的最主要原因就是RuntimeException。这是 unchecked
异常,程序默认会在控制台输出栈追踪信息,并终止线程。
典型的线程池工作者线程结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void run () { Throwable thrown = null ; try { while (!isInterrupted()) { runTask(getTaskFromWorkueue()); } } catch (Throwable e) { thrown = e; } finally { threadExited(this , throw ); } }
3.2 未捕获异常的处理 Thread
api 中同样提供了 UncaughtExceptionHandler
,它能检测出某个线程由于未捕获的异常而终结的情况。
1 2 3 4 5 6 7 8 9 10 @FunctionalInterface public interface UncaughtExceptionHandler { void uncaughtException (Thread t, Throwable e) ; }
当一个线程由于未捕获异常而退出时,JVM 会把这个事件报告给应用程序提供的 UncaughtExceptionHandler
异常处理器。如果没有提供任何异常处理器,那么默认的行为是将栈追踪信息输出到 System.err
。
异常处理器如何处理未捕获异常,取决于对服务质量的需求。最常见的响应方式是将一个错误信息以及相应的栈追踪信息写入应用程序日志中。
1 2 3 4 5 6 7 8 9 10 11 12 import java.util.logging.Level;import java.util.logging.Logger;public class UEHLogger implements Thread .UncaughtExceptionHandler { public void uncaughtException (Thread thread, Throwable throwable) { Logger logger = Logger.getAnonymousLogger(); logger.log(Level.SEVERE, "线程异常终止: " + thread.getName(), throwable); } }
在运行时间较长的应用程序中,通常会为所有线程的未捕获异常指定同一个异常处理器,并且该处理器至少会将异常信息记录到日志中。
要为线程池中的所有线程设置一个 UncaughtExceptionHandler
,需要为 ThreadPoolExecutor
的构造函数提供一个 ThreadFactory
。
1 2 3 4 5 6 7 8 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {}
标准线程池允许当发生未捕获异常时结束线程,但由于使用了一个 try-finally
代码来接收通知,因此当线程结束时,将有新的线程来代替它。
如果没有提供捕获异常处理器或者其他的故障通知机制,那么任务会悄悄失败,从而导致极大的混乱。
如果你希望在任务由于发送异常而失败时获得通知并且执行一些特定于任务的恢复操作,那么可以将任务封装在能捕获异常的 Runnable
或 Callable
中,或者改写 ThreadPoolExecutor.afterExecute()
方法。
1 protected void afterExecute (Runnable r, Throwable t) { }
只有通过 execute()
提交的任务,才能将它抛出的异常交给未捕获异常处理器,而通过 submit()
提交的任务,会被封装成ExecutionException 抛出。
1 2 3 4 5 6 public void execute (Runnable command) {}
ThreadPoolExecutor 重写了父接口 Executor
的 execute()
方法。而 submit()
方法是父接口 AbstractExecutorService
的,且 ThreadPoolExecutor 未将其重写并交予RejectedExecutionHandler处理。
4. JVM关闭 4.1 JVM 关闭
JVM既可以正常关闭也可以强行关闭。
正常关闭的触发方式有多种,包括:当最后一个“非守护“线程结束时,或者调用System.exit时,或者通过其他特定平台的方法关闭时(如:Crtl+C
、发送signet信号 等)
4.2 关闭钩子 关闭钩子
:是指通过 Runtime.addShutdownHook
注册的但尚未开始的线程。
在正常关闭中,JVM首先调用所有已注册的关闭钩子(Shutdown hook)。
JVM并不能保证关闭钩子的调用顺序。
在关闭应用程序线程中,如果线程仍然在运行,那么这些线程接下来和关闭进程并发执行。
如果 runFinalizerOnExit
为 true
。那么JVM将运行终结器,然后再停止。
JVM并不会停止或中断任何在关闭时仍然运行的应用程序线程。当JVM最终结束时,这些线程将被强行结束。
如果关闭钩子或终结器没有执行完成,那么正常关闭进程“挂起”并且JVM必须被强行关闭。
当强行关闭时,只是关闭JVM,而不会运行关闭钩子。
关闭钩子应该是线程安全的:
它们在访问共享数据时必须使用同步机制,并且小心的避免死锁,这和其他并发代码的要求相同。
而且,关闭钩子不应该对应用程序的状态或者JVM的关闭原因作出任何假设。
最后,关闭钩子应该尽快退出,因为它们会延迟JVM的结束时间,而用户可能希望JVM尽快终止。
关闭钩子可以用于实现服务或应用程序的清理工作,例如清理临时文件。
由于关闭钩子将并发执行,因此在关闭日志文件时可能导致其他需要日志服务的关闭钩子产生问题。实现这种功能的一种方式是对所有服务使用同一个关闭钩子,并且在关闭钩子中执行一系列的关闭操作。
注册关闭钩子来停止日志服务
1 2 3 4 5 6 7 8 9 10 11 public void start () { Runtime.getRuntime().addShutdownHook(new Thread (){ public void run () { try { LogService.this .stop(); } catch (InterruptedException ignored){ } } } }
4.3 守护线程
线程分为两种:普通线程
和 守护线程
。
在JVM启动时创建的所有线程中,除了主线程,其他都是守护线程(例如GC等)。
普通线程和守护线程的差异仅仅在于当线程退出时发生的操作。当一个线程退出时,JVM会检查其他正在运行的线程,如果这些线程都是守护线程,JVM会正常退出操作,当JVM停止时,所有仍然存在的守护线程都将被抛弃 —- 既不会执行 finally 代码块,也不会执行回卷栈,而JVM只是直接退出。
此外,守护线程通常不能用来代替应用程序管理程序中各个服务的生命周期。
4.4 终结器 Finalize
由于 终结器
可以在某个由JVM管理的线程中运行因此终结器访问的任何状态都可能被多个线程访问,这样就必须对其访问操作进行同步。
在大多数情况下,通过 finally 代码块
和 显式的 close()
方法,能够比使用终结器更好的管理资源。唯一的例外是:当需要管理对象,并且该对象持有的资源是通过本地方法获得的。
避免使用终结器。