Java并发编程实战:第7章 取消和关闭

1. 任务取消

1.1 任务取消

外部代码,能够将某个操作正常完成之前,将其置入完成状态,那么这个操作就称为可取消的(Cancellable)

取消操作的原因有很多:

  1. 用户请求取消。
  2. 有时间限制的操作,如超时设定。
  3. 应用程序事件。
  4. 错误。
  5. 关闭。

1.2 示例:使用 volatile 域,保存取消状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static java.util.concurrent.TimeUnit.SECONDS;

/**
* 生成素数
*/
@ThreadSafe
public class PrimeGenerator implements Runnable {
private static ExecutorService exec = Executors.newCachedThreadPool();

@GuardedBy("this")
private final List<BigInteger> primes = new ArrayList<>();

private volatile boolean cancelled;

@Override
public void run() {
BigInteger prime = BigInteger.ONE;
while (!cancelled) {
// 返回大于此 prime 且可能是素数的第一个整数
prime = prime.nextProbablePrime();
synchronized (this) {
primes.add(prime);
}
}
}

public void cancel() {
cancelled = true;
}

public synchronized List<BigInteger> get() {
return new ArrayList<BigInteger>(primes);
}

/**
* 生成【素数】的程序运行一秒钟
* @return 素数
*/
static List<BigInteger> aSecondOfPrimes() throws InterruptedException {
PrimeGenerator generator = new PrimeGenerator();
exec.execute(generator);
try {
// 主线程sleep1秒
SECONDS.sleep(1);
} finally {
// 暂停生成素数的线程
generator.cancel();
}
return generator.get();
}
}

Primecenerator 使用了简单的取消策略:客户端代码通过调用 cancel 请求取消,Primecenerator 每次搜索素数前检查是否有取消请求,当发现取消请求时就退出。

1.3 中断

  • 线程中断 是一种协作机制,线程可以通过这种机制来通知另一个线程,告诉它在某些情况下停止当前工作,并转而执行其他的工作。
  • 每个线程都有一个 boolean 类型的中断状态。当中断线程时,这个线程的中断状态将被设置为 true。

线程的中断方法:

1
2
3
4
5
6
7
8
9
public class Thread {
/** 中断这个线程 */
public void interrupt() { ... }
/** 获取此线程是否已被中断。线程的中断状态不受此方法的影响。 */
public boolean isInterrupted() { ... }
/** 获取当前线程是否被中断。通过该方法清除线程的中断状态。换句话说,如果这个方法被连续调用两次,第二次调用将返回 false */
public static boolean interrupted(){ ... }
......
}

调用 interrupt 并不意味着立即停止目标线程正在进行的工作,而只是传递了请求中断的消息。

中断操作,并不会真正的中断一个正在运行的线程,只是发出了中断请求,线程会在一个合适的时刻中断自己。

1.4 中断策略

  • 中断策略 规定线程如何解释某个 中断请求

  • 取消操作:中断策略 规定 线程级取消操作 或者 服务级取消操作

    1
    2
    1. 尽快退出,必要时进行清理
    2. 通知线程所有者,该线程已经退出
  • 此外还可以建立其他的中断策略,如 暂停 服务、重新开始 服务。

1.5 响应中断

在调用可中断的阻塞函数时,有两种实用策略可以处理 InterruptedException 中断异常:

  • 传递异常,使方法成为可中断的阻塞方法。如:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public interface BlockingQueue<E> extends Queue<E> {
    // 将指定元素插入此队列。队列满了,则等待有位置再插入
    void put(E e) throws InterruptedException;
    }

    public class Thread implements Runnable {
    // 使当前执行的线程在指定的毫秒数内休眠
    public static native void sleep(long millis) throws InterruptedException;
    }

    // BlockingQueue.put() 方法和 Thread.sleep() 方法,都传递了异常 ---- InterruptedException
    // 如果任何线程中断了当前线程。抛出此异常时清除当前线程的中断状态
  • 保存中断状态,上层调用栈中的代码,能够对线程的中断状态进行处理。如:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public class Thread implements Runnable {
    // 中断线程,只是将线程状态标记为中断状态,但不会终止线程,线程还会继续执行
    public void interrupt() {
    // ...
    }

    // 测试当前线程是否被中断。通过该方法清除线程的中断状态。
    public static boolean interrupted() {
    return currentThread().isInterrupted(true);
    }

    // 测试某个线程是否已被中断。根据传递的 ClearInterrupted 的值,是否重置中断状态。
    private native boolean isInterrupted(boolean ClearInterrupted);
    }

1.6 示例:计时运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class TimeRun {
private static final Integer CORE_POOL_SIZE = 100;
private static final ThreadFactory THREAD_FACTORY = Executors.defaultThreadFactory();
private static final ScheduledExecutorService EXEC = new ScheduledThreadPoolExecutor(CORE_POOL_SIZE, THREAD_FACTORY);

/** 外部线程中,安排中断 */
public static void timedRun(Runnable runnable, long timeout, TimeUnit unit) {
// 获取调用者线程
final Thread task = Thread.currentThread();
// 在一定时间后,将【调用者线程】标记为中断状态
ScheduledFuture<?> schedule = EXEC.schedule(task::interrupt, timeout, unit);
// 执行传入的 runnable 线程
EXEC.execute(runnable);
}
}
  • 在中断线程之前,应该了解它的中断策略

  • 由于 timedRun 可以从任意一个线程中调用,因此它无法知道这个调用线程的中断策略。例如:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Executors;
    import java.util.concurrent.FutureTask;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.ScheduledFuture;
    import java.util.concurrent.ScheduledThreadPoolExecutor;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.TimeUnit;

    public class TimedRun0 {
    public static void main(String[] args) throws InterruptedException {
    // 创建一个新的 task
    FutureTask<String> runImpl = new FutureTask<>(new RunImpl(), "runImpl");
    // 一定时间后,中断主线程,即 将main线程设为中断状态
    TimeRun.timedRun(runImpl, 1, TimeUnit.MILLISECONDS);
    // 调用者执行自己的逻辑
    if (Thread.currentThread().isInterrupted()) {
    System.out.println("【main】线程执行了一些逻辑。 ------------------【main】计算结果 = 123");
    } else {
    System.out.println("【main】线程执行了一些逻辑。 ==================【main】计算结果 = abc");
    }
    }
    }


    /** 测试 runnable */
    class RunImpl implements Runnable {
    @Override
    public void run() {}
    }

    结果可能是:

    1
    【main】线程执行了一些逻辑。 ------------------【main】计算结果 = 123
    1
    【main】线程执行了一些逻辑。 ==================【main】计算结果 = abc

1.7 通过 Future 取消任务

Future<V> 可以用来和已经提交的任务进行交互。Future<V> 接口定义了如下几个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface Future<V> {

// 尝试取消执行此任务
boolean cancel(boolean mayInterruptIfRunning);

// 如果此任务在正常完成之前被取消,则返回 true
boolean isCancelled();

// 如果任务已完成返回 true。
boolean isDone();

// 等待计算完成,然后检索其结果。
V get() throws InterruptedException, ExecutionException;

// 如果需要等待最多在给定的时间计算完成,然后检索其结果
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

}

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
* Created by osys on 2022/08/28 21:48.
*/
public class FutureTest01 {

public static void main(String[] args) throws Exception {
ExecutorService threadPool = Executors.newSingleThreadExecutor();

// task 需要运行 3 秒
SimpleTask task = new SimpleTask(3_000);
Future<Double> future = threadPool.submit(task);
threadPool.shutdown(); // 发送关闭线程池的指令

double time = future.get();
System.out.format("任务运行时间: %.3f s\n", time);

}

private static final class SimpleTask implements Callable<Double> {

private final int sleepTime; // ms

public SimpleTask(int sleepTime) {
this.sleepTime = sleepTime;
}

@Override
public Double call() throws Exception {
double begin = System.nanoTime();

Thread.sleep(sleepTime);

double end = System.nanoTime();
double time = (end - begin) / 1E9;

return time; // 返回任务运行的时间,以 秒 计
}

}

}

如上,创建了一个 SimpleTask,该 task 运行时会休眠 3000ms,运行总时间会大于3000ms。

测试结果:

1
任务运行时间: 3.002 s

在上例基础上,通过 Future.cancel() 取消任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
* Created by osys on 2022/08/28 21:48.
*/
public class FutureTest02 {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newSingleThreadExecutor();

// task 需要运行 3 秒
SimpleTask task = new SimpleTask(3_000);
Future<Double> future = threadPool.submit(task);
// 发送关闭线程池的指令
threadPool.shutdown();

// 在 2 秒之后取消该任务
cancelTask(future, 2_000);

try {
double time = future.get();
System.out.format("任务运行时间: %.3f s\n", time);
} catch (CancellationException ex) {
System.err.println("任务被取消");
} catch (InterruptedException ex) {
System.err.println("当前线程被中断");
} catch (ExecutionException ex) {
System.err.println("任务执行出错");
}

}

private static void cancelTask(final Future<?> future, final int delay) {
Runnable cancellation = () -> {
try {
Thread.sleep(delay);
// 取消与 future 关联的正在运行的任务
future.cancel(true);
} catch (InterruptedException ex) {
ex.printStackTrace(System.err);
}
};
new Thread(cancellation).start();
}

private static final class SimpleTask implements Callable<Double> {
// ms
private final int sleepTime;

public SimpleTask(int sleepTime) {
this.sleepTime = sleepTime;
}

@Override
public Double call() throws Exception {
double begin = System.nanoTime();
Thread.sleep(sleepTime);
double end = System.nanoTime();
double time = (end - begin) / 1E9;
// 返回任务运行的时间,以 秒 计
return time;
}

}
}

这里 task 的运行总时间会大于3000ms的,不过在 2 秒之后取消该任务。运行结果:

1
任务被取消

1.8 调用 Futurecancel(true) 不一定能取消正在运行的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
* 判断一个数是否为素数
*/
public class FutureTest03 {

public static void main(String[] args) throws Exception {
ExecutorService threadPool = Executors.newSingleThreadExecutor();

long num = 1000000033L;
PrimerTask task = new PrimerTask(num);
Future<Boolean> future = threadPool.submit(task);
threadPool.shutdown();

// get result
boolean result = future.get();
System.out.format("%d 是否为素数? %b\n", num, result);

}

// 判断一个数是否为素数
private static final class PrimerTask implements Callable<Boolean> {
private final long num;
public PrimerTask(long num) {
this.num = num;
}
@Override
public Boolean call() {
double begin = System.nanoTime();
// i < num 让任务有足够的运行时间
for (long i = 2; i < num; i++) {
if (num % i == 0) {
return false;
}
}
double end = System.nanoTime();
double time = (end - begin) / 1E9;
System.out.format("任务运行时间: %.3f s\n", time);
return true;
}

}

}

Output:

1
2
任务运行时间: 15.033 s
1000000033 是否为素数? true

任务的运行时间,大约是15.033 s

在任务运行到 2 秒的时候调用 Futurecancel(true)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
* 判断一个数是否为素数
*/
public class FutureTest04 {

public static void main(String[] args) throws Exception {
ExecutorService threadPool = Executors.newSingleThreadExecutor();

long num = 1000000033L;
PrimerTask task = new PrimerTask(num);
Future<Boolean> future = threadPool.submit(task);
// 发送关闭线程池的指令
threadPool.shutdown();

// 在 2 秒之后取消该任务
cancelTask(future, 2_000);

try {
boolean result = future.get();
System.out.format("%d 是否为素数? %b\n", num, result);
} catch (CancellationException ex) {
System.err.println("任务被取消");
} catch (InterruptedException ex) {
System.err.println("当前线程被中断");
} catch (ExecutionException ex) {
System.err.println("任务执行出错");
}
}

// 判断一个数是否为素数
private static final class PrimerTask implements Callable<Boolean> {
private final long num;
public PrimerTask(long num) {
this.num = num;
}
@Override
public Boolean call() {
double begin = System.nanoTime();
// i < num 让任务有足够的运行时间
for (long i = 2; i < num; i++) {
if (num % i == 0) {
return false;
}
}
double end = System.nanoTime();
double time = (end - begin) / 1E9;
System.out.format("任务运行时间: %.3f s\n", time);
return true;
}

}

// 在 delay ms 后取消 task
private static void cancelTask(final Future<?> future, final int delay) {
Runnable cancellation = () -> {
try {
Thread.sleep(delay);
// 取消与 future 关联的正在运行的任务
future.cancel(true);
} catch (InterruptedException ex) {
ex.printStackTrace(System.err);
}
};
new Thread(cancellation).start();
}
}

Output:

1
2
任务被取消
任务运行时间: 18.395 s

可以发现,虽然我们取消了任务,Futureget 方法也对我们的取消做出了响应(即抛出 CancellationException 异常),但是任务并没有停止,而是直到任务运行完毕了,程序才结束。

原因:

  • 如上代码 Futureget 方法对我们的取消做出了响应:

    1
    2
    3
    catch (CancellationException ex) {
    System.err.println("任务被取消");
    }

    但是任务并没有停止,而是直到任务运行完毕了,程序才结束。

  • 查看一下 FutureTask.cancel() 方法源码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public boolean cancel(boolean mayInterruptIfRunning) {
    // 任务的运行状态,NEW
    // 查看当前任务,在内存中偏移量为 stateOffset 位置的值,是否等于 NEW
    // 如果偏移量为 stateOffset 位置的值等于 NEW,那么将其设置为 INTERRUPTING 或者 CANCELLED
    if (!(state == NEW &&
    UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
    mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
    // 任务偏移量为stateOffset位置的值 != NEW,且任务状态 != NEW
    return false;
    try {
    if (mayInterruptIfRunning) {
    try {
    Thread t = runner;
    if (t != null)
    t.interrupt();
    } finally {
    // 查看当前任务在内存中偏移量为 stateOffset 位置的值,将其值设置为 INTERRUPTED
    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
    }
    }
    } finally {
    finishCompletion();
    }
    return true;
    }

    Unsafe.class 中的 compareAndSwapInt()方法和 putOrderedInt()方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public final class Unsafe {

    /**
    * 读取传入对象 var1 在内存中偏移量为 var2 位置的值与期望值 var4 作比较。
    * 相等就把 var5 值赋值给 var2 位置的值,方法返回true
    * 不相等,就取消赋值,方法返回false。
    */
    public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

    /**=
    * 读取传入对象 var1 在内存中偏移量为 var2 位置的值,将其设置为 var4
    */
    public native void putOrderedInt(Object var1, long var2, int var4);
    }
  • FutureTask.cancel() 源码可以知道,在运行状态下的任务,如果我们调用 cancel() 方法,传入 true 为参数,那么接下来会调用 interrupt() 方法将线程状态标记为中断状态。

  • 将线程状态标记为中断状态,但不会终止线程,线程还会继续执行。

1.9 对线程中断做出响应 的任务

如上 FutureTest04 ,这里对线程中断作出响应操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private static final class PrimerTask implements Callable<Boolean> {
private final long num;
public PrimerTask(long num) {
this.num = num;
}
@Override
public Boolean call() {
double begin = System.nanoTime();
// i < num 让任务有足够的运行时间
for (long i = 2; i < num; i++) {
// 在每次进入循环时,进行判断,任务是否被取消
if (Thread.currentThread().isInterrupted()) {
System.out.println("PrimerTask.call() task 被取消 ------- i = " + i);
double end = System.nanoTime();
double time = (end - begin) / 1E9;
System.out.format("任务运行时间: %.3f s\n", time);
return false;
}
if (num % i == 0) {
return false;
}
}
double end = System.nanoTime();
double time = (end - begin) / 1E9;
System.out.format("任务运行时间: %.3f s\n", time);
return true;
}

}

Output:

1
2
3
PrimerTask.call() task 被取消 ------- i = 154513461
任务运行时间: 2.151 s
任务被取消

通过 Thread.currentThread().isInterrupted() 方法,我们可以判断任务是否被取消,从而做出相应的取消任务的响应。

1.10 处理不可中断阻塞

  1. 并非所有的可阻塞方法或者阻塞机制都能响应中断。如果一个线程由于 执行同步的Socket I/O 或者 等待获得内置锁 而阻塞,那么中断请求只能设置线程的中断状态,除此之外没有其他任何作用。

  2. 由于执行不可中断操作而被阻塞的线程,可以使用类似于中断的手段来停止这些线程,但这要求我们必须知道线程阻塞的原因:

    • java.io 包中的同步Socket I/O

      InputStream.read() 和 OutputStream.write() 等方法都不会响应中断,通过关闭底层的套接字,可以使得由于执行这些方法的线程被阻塞,抛出一个SocketException。

    • java.nio 包中的同步I/O

      当中断一个正在 InterruptibleChannel 上等待的线程时,将抛出 ClosedByInterruptException 并关闭链路。

      当关闭一个 InterruptibleChannel 时,将导致所有在链路操作上阻塞的线程都抛出 AsynchronousCloseException。

      大多数的 Channel 都实现了 InterruptibleChannel。

    • Selector 的异步I/O

      如果一个线程在调用 Selector.select() 方法时阻塞了,那么调用 close() 或 wakeup() 方法会使线程抛出ClosedSelectorException 并提前返回。

    • 获得锁

      如果一个线程由于等待某个内置锁而阻塞,那么将无法响应中断。在 Lock 类中提供了 lockInterruptibly() 方法,该方法允许在等待一个锁的同时仍能响应中断。

1.11 如何封装非标准的取消操作

1.11.1 覆写 interrupt 来封裝非标准取消

在 Thread 中,通过覆写 interrupt 来封裝非标准取消

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;

/**
* Created by osys on 2022/08/28 21:48.
*/
public class ReaderThread extends Thread {
private static final int BUFSZ = 512;
private final Socket socket;
private final InputStream in;

public ReaderThread(Socket socket) throws IOException {
this.socket = socket;
this.in = socket.getInputStream();
}

public void interrupt() {
try {
socket.close();
} catch (IOException ignored) {
} finally {
super.interrupt();
}
}

public void run() {
try {
byte[] buf = new byte[BUFSZ];
while (true) {
int count = in.read(buf);
if (count < 0) {
break;
} else if (count > 0) {
processBuffer(buf, count);
}
}
} catch (IOException e) {
// 允许线程退出
}
}

public void processBuffer(byte[] buf, int count) {
}
}

1.11.2 用 newTaskFor 封装非标准的取消

  1. newTaskFor 是一个工厂方法,它将创建 Future 来代表任务。
  2. newTaskFor 还能返回一个 RunnableFuture 接口,该接口拓展了 Future 和 Runnable (并由FutureTask实现)。
  3. 当把一个 Callable 提交给 ExecutorService 时,submit 方法会返回一个 Future,我们可以通过这个 Future 来取消任务。
  4. 通过定制表示任务的 Future 可以改变 Future.cancel 的行为。

使用 newTaskFor 封装非标准的取消:

  • 创建一个 Callable Interface

    1
    2
    3
    4
    5
    6
    7
    8
    9
    /**
    * CancellableTask
    * @param <T> result obj
    */
    interface CancellableTask <T> extends Callable<T> {
    void cancel();

    RunnableFuture<T> newTask();
    }
  • 编写 CancellableTask 实现类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    public abstract class SocketUsingTask <T> implements CancellableTask<T> {
    @GuardedBy("this")
    private Socket socket;

    protected synchronized void setSocket(Socket s) {
    socket = s;
    }

    public synchronized void cancel() {
    try {
    if (socket != null)
    socket.close();
    } catch (IOException ignored) {
    }
    }

    /**
    * 创建一个 RunnableFuture,重写了 FutureTask.cancel() 方法
    * @return RunnableFuture
    */
    public RunnableFuture<T> newTask() {
    return new FutureTask<T>(this) {
    // 重写 FutureTask 的 cancel() 方法
    public boolean cancel(boolean mayInterruptIfRunning) {
    try {
    // cancel this SocketUsingTask
    SocketUsingTask.this.cancel();
    } finally {
    return super.cancel(mayInterruptIfRunning);
    }
    }
    };
    }
    }

    该实现类的 RunnableFuture.cancel() 方法,首先关闭当前对象套接字,随后关闭 RunnableFuture

  • 复写一个 Executor,其 newTaskFor() 方法针对 obj 类型进行 return,RunnableFuture 类型返回 SocketUsingTask.newTask()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    @ThreadSafe
    class CancellingExecutor extends ThreadPoolExecutor {
    public CancellingExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public CancellingExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public CancellingExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public CancellingExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    if (callable instanceof CancellableTask) {
    return ((CancellableTask<T>) callable).newTask();
    } else {
    return super.newTaskFor(callable);
    }
    }
    }

2. 停止基于线程的服务

2.1 停止基于线程的服务

  • 应用程序通常会创建拥有线程的服务,比如线程池,这些服务的存在时间通常比创建它们的方法存在的时间更长。

  • 线程通过一个 Thread 对象表示,像其它对象一样,线程可以被自由的共享。线程API中并没有关于线程所属权正规的概念。

  • 如果应用程序优雅地退出,这些服务的线程也需要结束。因为没有退出线程惯用的优先方法,它们需要自行结束。

  • 通常我们使用线程池来创建线程和关闭线程。线程池即为线程的拥有者,操作线程、改变优先级等都由线程池来负责。

  • 服务应该提供生命周期方法来关闭自己,并关闭它所拥有的线程。当我们关闭应用程序这个服务时,服务就可以关闭所有的线程了。

  • ExecutorService 提供了 shutdown()、shutdownNow() 方法,其它持有线程的服务,也应该提供类似的关闭机制。

对于持有线程的服务,主要服务的存在时间大于创建线程的方法的存在时间,那么就应该提供生命周期方法。

2.2 示例:日志服务

不支持关闭的生产者-消费者日志服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import java.io.PrintWriter;
import java.io.Writer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
* Created by osys on 2022/08/28 21:48.
*/
public class LogWriter {
private final BlockingQueue<String> queue;
private final LoggerThread logger;
private static final int CAPACITY = 1000;

public LogWriter(Writer writer) {
this.queue = new LinkedBlockingQueue<String>(CAPACITY);
this.logger = new LoggerThread(writer);
}

public void start() {
logger.start();
}

public void log(String msg) throws InterruptedException {
queue.put(msg);
}

/**
* 日记线程 class
*/
private class LoggerThread extends Thread {
private final PrintWriter writer;

public LoggerThread(Writer writer) {
this.writer = new PrintWriter(writer, true);
}

public void run() {
try {
while (true) {
writer.println(queue.take());
}
} catch (InterruptedException ignored) {
} finally {
writer.close();
}
}
}
}

如上 LogWriter 为一个生产者-消费者模型,日志活动被分离到一个单独的日志线程中。产生消息的线程不会将消息直接写入输出流,而是由 LogWriter 通过 BlockingQueue 把这个任务提交给日志线程,并由日志线程写入。

在正常情况下,如何关闭日志线程?

  • 取消一个生产者-消费者活动,既要取消生产者,又要取消消费者。
  • 中断日志线程,应着手处理消费者,但是如果消费者和生产者不在同一个线程(如上LogWriter),生产者线程实现一个“检查在运行”
    • 生产者观察是否被关闭服务,
      • 如若关闭服务了,停止消息入队
      • 否则继续消息入队
    • 消费者正常消费消息,如若队列已空(被阻塞),检查是否关闭了服务。
      • 如果关闭了服务,关闭消费者进程,然后关闭生产者进程
      • 如果未关闭服务,继续等待,直到有消息入队
    • 对于生产者而已,可能会存在,队列已满,消息被阻塞,无法入队。

向日志服务添加不可靠的关闭支持

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import java.io.PrintWriter;
import java.io.Writer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
* Created by osys on 2022/08/28 21:48.
*/
public class LogWriter2 {
private final BlockingQueue<String> queue;
private final LoggerThread2 logger;
private static final int CAPACITY = 1000;

private boolean isShutdownRequested = false;

public LogWriter2(Writer writer) {
this.queue = new LinkedBlockingQueue<String>(CAPACITY);
this.logger = new LoggerThread2(writer);
}

public void start() {
logger.start();
}

public void log(String msg) throws InterruptedException {
queue.put(msg);
if (!isShutdownRequested) {
queue.put(msg);
} else {
throw new IllegalStateException("logger is shut down");
}
}

public void setShutdownStatus(boolean status) {
this.isShutdownRequested = status;
}

/**
* 日记线程 class
*/
private class LoggerThread2 extends Thread {
private final PrintWriter writer;

public LoggerThread2(Writer writer) {
this.writer = new PrintWriter(writer, true);
}

public void run() {
try {
while (true) {
writer.println(queue.take());
}
} catch (InterruptedException ignored) {
} finally {
writer.close();
}
}
}
}

这些不能解决最基本的问题,可能会导致失败,创建新日志消息的各个子任务都必须是原子操作。

向 LogWriter 中添加可靠的取消

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import net.jcip.annotations.GuardedBy;

import java.io.PrintWriter;
import java.io.Writer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
* Created by osys on 2022/08/28 21:48.
*/
public class LogService {
private final BlockingQueue<String> queue;
private final LoggerThread loggerThread;
private final PrintWriter writer;
@GuardedBy("this")
private boolean isShutdown;
@GuardedBy("this")
private int reservations;

public LogService(Writer writer) {
this.queue = new LinkedBlockingQueue<String>();
this.loggerThread = new LoggerThread();
this.writer = new PrintWriter(writer);
}

public void start() {
loggerThread.start();
}

public void stop() {
synchronized (this) {
isShutdown = true;
}
loggerThread.interrupt();
}

/** 生产 */
public void log(String msg) throws InterruptedException {
synchronized (this) {
if (isShutdown) {
throw new IllegalStateException("LogService Is Shut Down");
}
++ reservations;
}
queue.put(msg);
}

/** 消费 */
private class LoggerThread extends Thread {
public void run() {
try {
while (true) {
try {
synchronized (LogService.this) {
if (!isShutdown || reservations != 0) {
// 队列中存在消息,LogService 未关闭
} else {
break;
}
}
// 消费
String msg = queue.take();
synchronized (LogService.this) {
--reservations;
}
writer.println(msg);
} catch (InterruptedException e) {
}
}
} finally {
writer.close();
}
}
}
}

2.3 关闭 ExecutorService

在 ExecutorService 中提供了 shutdown()、shutdownNow() 方法对其进行关闭:

1

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 调用这个方法时,ExecutorService 停止接受任何新的任务
* 且等待已经提交的任务执行完成,当所有已经提交的任务执行完毕后将会关闭ExecutorService
* (已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的)
*/
void shutdown();

/**
* 调这个方法会强制关闭 ExecutorService,
* 它将取消所有运行中的任务和在工作队列中等待的任务,
* 这个方法返回一个 List 列表,列表中返回的是等待在工作队列中的任务
*/
List<Runnable> shutdownNow();

使用 ExcutorService 的日志服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import java.io.PrintWriter;
import java.io.Writer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class LogService2 {
private int THREAD_NUM = 10;
private final ExecutorService exec = new ThreadPoolExecutor(THREAD_NUM, THREAD_NUM, 0,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(CAPACITY));
private final long TIMEOUT = 10000;
private final TimeUnit UNIT = TimeUnit.MILLISECONDS;
private final WriteTask writer;
private final BlockingQueue<String> queue;
private static final int CAPACITY = 1000;

public LogService2(Writer writer) {
this.writer = new WriteTask(writer);
this.queue = new LinkedBlockingQueue<String>(CAPACITY);
}

public void start() {}

public void stop() throws InterruptedException {
try{
exec.shutdown();
exec.awaitTermination(TIMEOUT, UNIT);
} finally {
writer.close();
}
}

public void log(String msg) {
try{
queue.put(msg);
exec.execute(writer);
} catch (RejectedExecutionException ignored) {

} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

private class WriteTask implements Runnable {

private final PrintWriter writer;

public WriteTask(Writer writer) {
this.writer = (PrintWriter) writer;
}

public void close() {
this.writer.close();
}

@Override
public void run() {
try {
while (true) {
writer.println(LogService2.this.queue.take());
}
} catch (InterruptedException ignored) {
} finally {
writer.close();
}
}
}
}

2.4 致命药丸

致命药丸:一个置于队列中,可识别的对象。

我们可以使用 致命药丸 来关闭生产者-消费者服务。

使用致命药丸来关闭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
* Created by osys on 2022/08/28 21:48.
*/
public class IndexingService {
private static final int CAPACITY = 1000;
private static final File POISON = new File("");
private final IndexerThread consumer = new IndexerThread();
private final CrawlerThread producer = new CrawlerThread();
private final BlockingQueue<File> queue;
private final FileFilter fileFilter;
private final File root;

public IndexingService(File root, final FileFilter fileFilter) {
this.root = root;
this.queue = new LinkedBlockingQueue<File>(CAPACITY);
// 创建一个文件过滤器对象,该 root 下的所有文件,会被生产者以这过滤器为条件,将所有文件使用【文件爬虫】添加到队列中
this.fileFilter = new FileFilter() {
/**
* 测试指定的抽象路径名是否应包含在路径名列表中
* @param pathname 路径名
* @return true/false
*/
public boolean accept(File pathname) {
return pathname.isDirectory() || fileFilter.accept(pathname);
}
};
}

/** 文件是否已经加入到队列中,false表示未加入 */
private boolean alreadyIndexed(File f) {
return false;
}

public void start() {
producer.start();
consumer.start();
}

public void stop() {
producer.interrupt();
}

public void awaitTermination() throws InterruptedException {
consumer.join();
}

/**
* 生产者:文件爬虫
*/
class CrawlerThread extends Thread {
public void run() {
try {
// 检索文件
crawl(root);
} catch (InterruptedException e) {
} finally {
while (true) {
try {
// 将药丸添加到队列中
queue.put(POISON);
break;
} catch (InterruptedException e1) {}
}
}
}

/**
* 检索文件
* @param root 文件/目录
*/
private void crawl(File root) throws InterruptedException {
// 【文件集合 + 文件夹集合】 的 path
File[] entries = root.listFiles(fileFilter);
// 检索文件 path,并添加到队列中
if (entries != null) {
for (File entry : entries) {
if (entry.isDirectory()) {
crawl(entry);
} else if (!alreadyIndexed(entry)) {
queue.put(entry);
}
}
}
}
}

/**
* 消费者:对检索出来的文件进行处理
*/
class IndexerThread extends Thread {

/** 对检索出来的文件进行处理,知道遇到【药丸】文件 */
public void run() {
try {
while (true) {
File file = queue.take();
if (file == POISON)
break;
else
indexFile(file);
}
} catch (InterruptedException consumed) {
}
}

/** 处理文件 */
public void indexFile(File file) {
};
}
}

2.5 示例:只执行一次的服务

如果一个方法需要处理一批任务,并在所有任务结束前不会返回,那么它可以通过使用私有的 Executor 来简化服务的生命周期管理,其中 Executor 的寿命限定在该方法中 (在这种情况下,通常会用到 invokeAllinvokeAny 方法)。

  • invokeAll() 方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    /**
    * 提交任务集到 ExecutorService,返回所有任务的执行结果。
    * 该方法会阻塞,必须等待所有的任务执行完成后统一返回。
    *
    * 如果全部任务在指定的时间内没有完成,则抛出异常。
    * @param tasks Collection<? extends Callable<T>>
    * @param timeout 超时时间
    * @param unit 时间单位
    * @return 执行后,返回的是 <T> List<Future<T>> 对象
    */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
  • invokeAny() 方法

    1
    2
    3
    4
    5
    /** 提交任务集到 ExecutorService,返回第一个执行完的任务的结果值 */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

    /** 提交任务集到 ExecutorService,在指定时间内,返回第一个执行完的任务的结果值,否则 Exception */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

在多台主机上并行的检查新邮件

使用私有 Executor,将它的寿命限定于一次方法调用中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Created by osys on 2022/08/28 21:48.
*/
public class CheckForMail {
public boolean checkMail(Set<String> hosts, long timeout, TimeUnit unit) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
final AtomicBoolean hasNewMail = new AtomicBoolean(false);
try {
for (final String host : hosts) {
exec.execute(new Runnable() {
public void run() {
if (checkMail(host)) {
hasNewMail.set(true);
}
}
});
}
} finally {
exec.shutdown();
exec.awaitTermination(timeout, unit);
}
return hasNewMail.get();
}

/** 检查邮件 */
private boolean checkMail(String host) {
return false;
}
}

2.6 shutdownNow() 的局限性

1
2
3
4
5
6
/**
* 调这个方法会强制关闭 ExecutorService,
* 它将取消所有运行中的任务和在工作队列中等待的任务,
* 这个方法返回一个 List 列表,列表中返回的是等待在工作队列中的任务
*/
List<Runnable> shutdownNow();
  • 当通过 shutdownNow 来强行关闭 ExecutorService 时,它会尝试取消正在运行的任务并返回所有已提交但尚未开始的任务
  • 我们无法通过常规方法来找出哪些任务已经开始但尚未结束。这意味着我们无法在关闭过程中知道正在执行的任务的状态,除非任务本身会执行某种检查。

关闭之后,ExecutorService 获取被取消的任务

TrackingExecutor 中给出了如何在关闭过程中判断正在执行的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Created by osys on 2022/08/28 21:48.
*/
public class TrackingExecutor extends AbstractExecutorService {
private final ExecutorService exec;

/** 正在执行,且为完成的 runnable */
private final Set<Runnable> tasksCancelledAtShutdown = Collections.synchronizedSet(new HashSet<Runnable>());

public TrackingExecutor(ExecutorService exec) {
this.exec = exec;
}

public void shutdown() {
exec.shutdown();
}

public List<Runnable> shutdownNow() {
return exec.shutdownNow();
}

public boolean isShutdown() {
return exec.isShutdown();
}

public boolean isTerminated() {
return exec.isTerminated();
}

public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return exec.awaitTermination(timeout, unit);
}

public List<Runnable> getCancelledTasks() {
if (!exec.isTerminated()) {
throw new IllegalStateException("线程池未关闭");
}
return new ArrayList<Runnable>(tasksCancelledAtShutdown);
}

public void execute(final Runnable runnable) {
exec.execute(new Runnable() {
public void run() {
try {
runnable.run();
} finally {
// 线程池关闭,且当前线程被标志为中断状态,那么将当前线程添加到 set 集合中
if (isShutdown() && Thread.currentThread().isInterrupted()) {
tasksCancelledAtShutdown.add(runnable);
}
}
}
});
}
}

使用 TrackingExecutorService 为后续执行来保存未完成的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import net.jcip.annotations.GuardedBy;

import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

/**
* 网络爬虫
*/
public abstract class WebCrawler {
/** 线程池:关闭该线程池,能获得正在执行的线程任务 */
private volatile TrackingExecutor exec;
/** 要爬取的 url */
@GuardedBy("this")
private final Set<URL> urlsToCrawl = new HashSet<URL>();
/** 已经爬取的 */
private final ConcurrentMap<URL, Boolean> seen = new ConcurrentHashMap<>();
private static final long TIMEOUT = 500;
private static final TimeUnit UNIT = MILLISECONDS;

public WebCrawler(URL startUrl) {
urlsToCrawl.add(startUrl);
}

public synchronized void start() {
exec = new TrackingExecutor(Executors.newCachedThreadPool());
for (URL url : urlsToCrawl) {
submitCrawlTask(url);
}
urlsToCrawl.clear();
}

/** 关闭线程池,将正在执行的爬虫程序对应的 url 保存起来到【urlsToCrawl】对象中 */
public synchronized void stop() throws InterruptedException {
try {
saveUnCrawled(exec.shutdownNow());
if (exec.awaitTermination(TIMEOUT, UNIT)) {
saveUnCrawled(exec.getCancelledTasks());
}
} finally {
exec = null;
}
}

private void saveUnCrawled(List<Runnable> unCrawled) {
for (Runnable task : unCrawled) {
urlsToCrawl.add(((CrawlTask) task).getPage());
}
}

/** 启动爬虫task,爬取url */
private void submitCrawlTask(URL u) {
exec.execute(new CrawlTask(u));
}

/** 爬虫 */
private class CrawlTask implements Runnable {
private final URL url;

CrawlTask(URL url) {
this.url = url;
}

private int count = 1;

/** 已经爬取的 url */
boolean alreadyCrawled() {
return seen.putIfAbsent(url, true) != null;
}


void markUnCrawled() {
seen.remove(url);
System.out.printf("marking %s unCrawled%n", url);
}

/** 爬取 */
public void run() {
for (URL link : processPage(url)) {
if (Thread.currentThread().isInterrupted()) {
return;
}
submitCrawlTask(link);
}
}

public URL getPage() {
return url;
}
}

protected abstract List<URL> processPage(URL url);

private class TrackingExecutor extends AbstractExecutorService {
private final ExecutorService exec;

/** 正在执行,且为完成的 runnable */
private final Set<Runnable> tasksCancelledAtShutdown = Collections.synchronizedSet(new HashSet<Runnable>());

public TrackingExecutor(ExecutorService exec) {
this.exec = exec;
}

public void shutdown() {
exec.shutdown();
}

public List<Runnable> shutdownNow() {
return exec.shutdownNow();
}

public boolean isShutdown() {
return exec.isShutdown();
}

public boolean isTerminated() {
return exec.isTerminated();
}

public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return exec.awaitTermination(timeout, unit);
}

public List<Runnable> getCancelledTasks() {
if (!exec.isTerminated()) {
throw new IllegalStateException("线程池未关闭");
}
return new ArrayList<Runnable>(tasksCancelledAtShutdown);
}

public void execute(final Runnable runnable) {
exec.execute(new Runnable() {
public void run() {
try {
runnable.run();
} finally {
// 线程池关闭,且当前线程被标志为中断状态,那么将当前线程添加到 set 集合中
if (isShutdown() && Thread.currentThread().isInterrupted()) {
tasksCancelledAtShutdown.add(runnable);
}
}
}
});
}
}

}

3. 处理反常的线程终止

3.1 处理反常的线程终止

  • 当并发程序中的某个线程发生故障使控制台中可能会输出栈追踪信息,但是没有人会观察控制台。

  • 此外,当线程发生故障时,应用程序可能看起来仍然在工作,所以这个失败很可能被忽略。

  • 幸运的是,我们有可以监测并防止程序中“遗漏”线程的方法。

导致线程死亡的最主要原因就是RuntimeException。这是 unchecked 异常,程序默认会在控制台输出栈追踪信息,并终止线程。

典型的线程池工作者线程结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void run(){
Throwable thrown = null;
try{
// 线程未中断,运行线程
while(!isInterrupted()) {
runTask(getTaskFromWorkueue());
}
} catch (Throwable e) {
// 捕获异常
thrown = e;
} finally {
// 终止线程
threadExited(this, throw);
}
}

3.2 未捕获异常的处理

Thread api 中同样提供了 UncaughtExceptionHandler ,它能检测出某个线程由于未捕获的异常而终结的情况。

1
2
3
4
5
6
7
8
9
10
/**
* 当线程由于未捕获的异常,而突然终止时,调用的处理程序接口。
*/
@FunctionalInterface
public interface UncaughtExceptionHandler {
/**
* 由于给定的线程,未捕获异常而终止时调用的方法。
*/
void uncaughtException(Thread t, Throwable e);
}

当一个线程由于未捕获异常而退出时,JVM 会把这个事件报告给应用程序提供的 UncaughtExceptionHandler 异常处理器。如果没有提供任何异常处理器,那么默认的行为是将栈追踪信息输出到 System.err

异常处理器如何处理未捕获异常,取决于对服务质量的需求。最常见的响应方式是将一个错误信息以及相应的栈追踪信息写入应用程序日志中。

1
2
3
4
5
6
7
8
9
10
11
12
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Created by osys on 2022/08/28 21:48.
*/
public class UEHLogger implements Thread.UncaughtExceptionHandler {
public void uncaughtException(Thread thread, Throwable throwable) {
Logger logger = Logger.getAnonymousLogger();
logger.log(Level.SEVERE, "线程异常终止: " + thread.getName(), throwable);
}
}

在运行时间较长的应用程序中,通常会为所有线程的未捕获异常指定同一个异常处理器,并且该处理器至少会将异常信息记录到日志中。

  • 要为线程池中的所有线程设置一个 UncaughtExceptionHandler,需要为 ThreadPoolExecutor 的构造函数提供一个 ThreadFactory

    1
    2
    3
    4
    5
    6
    7
    8
    public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler) {
    }
  • 标准线程池允许当发生未捕获异常时结束线程,但由于使用了一个 try-finally 代码来接收通知,因此当线程结束时,将有新的线程来代替它。

  • 如果没有提供捕获异常处理器或者其他的故障通知机制,那么任务会悄悄失败,从而导致极大的混乱。

  • 如果你希望在任务由于发送异常而失败时获得通知并且执行一些特定于任务的恢复操作,那么可以将任务封装在能捕获异常的 RunnableCallable 中,或者改写 ThreadPoolExecutor.afterExecute() 方法。

    1
    protected void afterExecute(Runnable r, Throwable t) { }

只有通过 execute() 提交的任务,才能将它抛出的异常交给未捕获异常处理器,而通过 submit() 提交的任务,会被封装成ExecutionException 抛出。

1
2
3
4
5
6
/**
* 在未来的某个时间执行给定的任务。
* 该任务可以在新线程或现有池线程中执行。
* 如果任务无法提交执行,要么是因为这个执行器已经关闭,要么是因为它的容量已经达到,任务由当前的RejectedExecutionHandler处理。
*/
public void execute(Runnable command) {}

ThreadPoolExecutor 重写了父接口 Executorexecute() 方法。而 submit() 方法是父接口 AbstractExecutorService 的,且 ThreadPoolExecutor 未将其重写并交予RejectedExecutionHandler处理。

4. JVM关闭

4.1 JVM 关闭

  • JVM既可以正常关闭也可以强行关闭。
  • 正常关闭的触发方式有多种,包括:当最后一个“非守护“线程结束时,或者调用System.exit时,或者通过其他特定平台的方法关闭时(如:Crtl+C、发送signet信号 等)

4.2 关闭钩子

关闭钩子:是指通过 Runtime.addShutdownHook 注册的但尚未开始的线程。

  1. 在正常关闭中,JVM首先调用所有已注册的关闭钩子(Shutdown hook)。
  2. JVM并不能保证关闭钩子的调用顺序。
  3. 在关闭应用程序线程中,如果线程仍然在运行,那么这些线程接下来和关闭进程并发执行。
  4. 如果 runFinalizerOnExittrue。那么JVM将运行终结器,然后再停止。
  5. JVM并不会停止或中断任何在关闭时仍然运行的应用程序线程。当JVM最终结束时,这些线程将被强行结束。
  6. 如果关闭钩子或终结器没有执行完成,那么正常关闭进程“挂起”并且JVM必须被强行关闭。
  7. 当强行关闭时,只是关闭JVM,而不会运行关闭钩子。

关闭钩子应该是线程安全的:

  • 它们在访问共享数据时必须使用同步机制,并且小心的避免死锁,这和其他并发代码的要求相同。
  • 而且,关闭钩子不应该对应用程序的状态或者JVM的关闭原因作出任何假设。
  • 最后,关闭钩子应该尽快退出,因为它们会延迟JVM的结束时间,而用户可能希望JVM尽快终止。

关闭钩子可以用于实现服务或应用程序的清理工作,例如清理临时文件。

由于关闭钩子将并发执行,因此在关闭日志文件时可能导致其他需要日志服务的关闭钩子产生问题。实现这种功能的一种方式是对所有服务使用同一个关闭钩子,并且在关闭钩子中执行一系列的关闭操作。

注册关闭钩子来停止日志服务

1
2
3
4
5
6
7
8
9
10
11
public void start(){
Runtime.getRuntime().addShutdownHook(new Thread(){
public void run(){
try{
LogService.this.stop();
} catch(InterruptedException ignored){

}
}
}
}

4.3 守护线程

  • 线程分为两种:普通线程守护线程
  • 在JVM启动时创建的所有线程中,除了主线程,其他都是守护线程(例如GC等)。

普通线程和守护线程的差异仅仅在于当线程退出时发生的操作。当一个线程退出时,JVM会检查其他正在运行的线程,如果这些线程都是守护线程,JVM会正常退出操作,当JVM停止时,所有仍然存在的守护线程都将被抛弃 —- 既不会执行 finally 代码块,也不会执行回卷栈,而JVM只是直接退出。

此外,守护线程通常不能用来代替应用程序管理程序中各个服务的生命周期。

4.4 终结器 Finalize

  • 当不再需要内存资源时,可以通过GC来回收他们,但对于其他一些资源,例如文件句柄或套接字句柄,必须显式的还给操作系统。

  • 为了实现这个功能,垃圾回收器对那些定义了 finalize() 方法的对象会进行特殊处理:在回收器释放它们后,调用它们的 finalize 方法,从而保证一些持久化资源被释放。

    1
    2
    3
    public class Object {
    protected void finalize() throws Throwable { }
    }

由于 终结器 可以在某个由JVM管理的线程中运行因此终结器访问的任何状态都可能被多个线程访问,这样就必须对其访问操作进行同步。

在大多数情况下,通过 finally 代码块显式的 close() 方法,能够比使用终结器更好的管理资源。唯一的例外是:当需要管理对象,并且该对象持有的资源是通过本地方法获得的。

避免使用终结器。

Java并发编程实战:第7章 取消和关闭

https://osys.github.io/posts/3df5.html

作者

Osys

发布于

2022年08月29日

更新于

2022年08月29日

许可协议

评论