1. 应用线程池
1.1 在任务与执行策略之间的隐性耦合
Executor 框架可以将任务的提交
与任务的执行
策略解耦开来。Executor 框架为制定和修改执行策略提供了相当大的灵活性,但并非所有的任务都能适用所有的执行策略。 有些类型的任 务需要明确地指定执行策略, 包括:
依赖性任务:简单来说就是,提交的 task
是需要依赖 其它任务
的, task
就类似有某种枷锁一样,浑身不自在。
- 大多数任务都是独立的,它们不依赖于其他任务的执行时序、 执行结果或其他效果。
- 当在线程池中执行独立的任务时, 可以随意地改变线程池的大小和配置,这些修改只会对执行性能产生影响。
- 如果提交给线程池的任务需要依赖其他的任务, 那么就隐含地给执行策略带来了约束, 此时必须小心地维持这些执行策略,以避免产生活跃性问题。
采用线程封闭机制的任务:
- 在单线程中的 Executor:能够对并发性做出更强的承诺。 它们能确保任务不会并发地执行, 使我们能够放宽代码对线程安全的要求。
对象
可以封闭在 task 所在的线程
中, 使得在该线程
中执行的 task 在访问该对象时不需要同步, 即使这些资源不是线程安全的也没有问题。
- Executor 从单线程环境改为线程池环境,任务可能会被并发地执行,失去了线程安全性。
对响应时间敏感的任务:
使用ThreadLocal的任务:
- ThreadLocal使每个线程都可以拥有某个变量的一个私有
版本
。然而,只要条件允许,Executor可以自由地重用这些线程。
- 在标准的Executor实现中,当执行需求较低时将回收空闲线程,而当需求增加时将添加新的线程,并且如果从任务中抛出了一个未检查异常,那么将用一个新的工作者线程来替代抛出异常的线程。
- 只有当线程本地值的生命周期受限于任务的生命周期时,在线程池的线程中使用ThreadLocal才有意义。
- 在线程池的线程中不应该使用 ThreadLocal在任务之间传递值。
只有当任务都是同类型的并且相互独立时,线程池的性能才能达到最佳。如果将运行时间较长的与运行时间较短的任务混合在一起,那么除非线程池很大,否则将可能造成拥塞
。如果提交的任务依赖于其他任务,那么除非线程池无限大,否则将可能造成死锁
。
1.1.1 线程饥饿死锁
在多线程中,如果某个线程任务,依赖于其它任务的执行,那么就有可能会产生线程死锁。
在单线程的 Executor 中,如果一个任务将另一个任务提交到同一个 Executor,并且等待这个被提交任务的结果,那么通常会引发死锁。
第二个任务停留在工作队列中,并等待第一个任务完成,而第一个任务又无法完成,因为它在等待第二个任务的完成。
在单线程化的 Executor 中死锁的任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future;
public class ThreadDeadlock { ExecutorService exec = Executors.newSingleThreadExecutor();
public class LoadFileTask implements Callable<String> { private final String fileName;
public LoadFileTask(String fileName) { this.fileName = fileName; }
public String call() throws Exception { return ""; } }
public class RenderPageTask implements Callable<String> { public String call() throws Exception { Future<String> header, footer; header = exec.submit(new LoadFileTask("header.html")); footer = exec.submit(new LoadFileTask("footer.html")); String page = renderBody();
return header.get() + page + footer.get(); }
private String renderBody() { return ""; } } public void test() { RenderPageTask mainTask = new RenderPageTask(); exec.submit(mainTask); } }
|
例子 ThreadDeadlock.RenderPageTask
将两个 LoadFileTask 添加到单例的 ExecutorService 中。在 tast()
中,我们将 RenderPageTask
添加到单例的 ExecutorService 中,那么会出现 mainTask 等待其他两个 LoadFileTask 的结果,久久不能结束,而那两个 LoadFileTask 却需要等待 mainTask 结束才能被单例的 ExecutorService 执行(久久不能开始)。
1.1.2 耗时操作
执行时间较长的任务不仅会造成线程池堵塞,甚至还会增加执行时间较短任务的服务时间。
如果线程池中线程的数量远小于在稳定状态下执行时间较长任务的数量, 那么到最后可能所有的线程都会运行这些执行时间较长的任务, 从而影响整体的响应性。
限定任务等待资源的时间, 而不要无限制地等待。
例如 Thread.join、BlockingQueue.put、CountDownLatch.await 以及 Selector.select 等。
1.2 定制线程池的大小
在代码中通常不会固定线程池的大小。线程池的长度应该通过某种配置机制来提供,或者根据 Runtime.availableProcessors()
来动态计算。
1 2 3 4
| public class Runtime { public native int availableProcessors(); }
|
- 要设置线程池的大小,需要避免【过大】和【过小】这两种极端情况。
- 如果线程池过大,那么大量的线程将在相对很少的CPU和内存资源上发生竞争,这不仅会导致更高的内存使用量,而且还可能耗尽资源。
- 如果线程池过小,那么将导致许多空闲的处理器无法执行工作,从而降低吞吐率。
- 如果需要执行不同类别的任务,井且它们之间的行为相差很大,那么应该考虑使用多个线程池,从而使每个线程池可以根据各自的工作负载来调整。
大小设定公式
1
| 最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目
|
1.3 配置 ThreadPoolExecutor
ThreadPoolExecutor 为一些 Executor 提供了基本的实现,这些 Executor 是由 Executors
中 的 newCachedThreadPool、newFixedThreadPool 和 newScheduledThreadExecutor 等工厂方法返回的。
1.3.1 线程的创建与销毁
核心池大小(core pool size)、最大池的大小(maximum pool size)和存活时间(keep-alive time)共同管理着线程的创建与销段。
核心池大小是目标的大小;线程池的实现试图维护池的大小:即使没有任务执行,池的大小也等于核心池的大小,并且直到工作队列充满前,池都不会创建更多的线程。
最大池的大小是可同时活动的线程数的上限。如果一个线程已经闲置的时间超过了存活时间,它将成为一个被回收的候选者,如果当前的池的大小超过了核心池的大小,线程池会终止它。
Executors.newFixedThreadPool()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class Executors { public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); } }
|
ThreadPoolExecutor
其中一个构造方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { }
|
1.3.2 管理队列任务
有限线程池限制了可以并发执行的任务的数量。
如果新增任务的频率超过了线程池能过处理它们的速度,任务将在队列中等候。
即使通车平均任务新增都很稳定,也难免会出现突然的激增。尽管队列有助于缓和瞬时的任务激增,但是如果任务持续快速地到来,队列中很多任务等待被执行,这可能会耗尽内存。
ThreadPoolExecutor 允许提供一个 BlockingQueue 来保存等待执行的任务。 基本的任务排队方法有 3 种: 无界队列
、有界队列
和同步移交
(Synchronous Handoff)。
对于庞大或者无限的池,可以使用 SynchronousQueue
,完全绕开队列,将任务直接从生产者移交
给工作者线程。
- SynchronousQueue 并不是一个真正的队列,而是一种管理直接在线程间移交信息的机制。
- 把一个元素放入到 SynchronousQueue 中,必须有另一个线程正在等待接受移交的任务。
- 如果没有这样一个线程,只要当前池的大小还小于最大值,ThreadPoolBxecutor 就会创建一个新的线程;否则根据饱和策略,任务会被拒绝。
- 只有当池是无限的,或者可以接受任务被拒绝,SynchronousQueue 才是一个有实际价值的选揮。
对于先进先出的队列(如:LinkedBlockingQueue、ArrayBlockingQueue),都是顺序执行任务的。如果想要控制任务的执行顺序,可以使用优先队列(PriorityBlockingQueue),通过 Comparator 定义任务优先级。
1.3.3 饱和策略
当一个有限队列充满后,饱和策略开始起作用。ThreadPoolExecutor 的饱和策略可以通过 setRejectedExecutionHandler()
来修改。JDK提供的实现有:AbortPolicy、CallerRunsPolicy、DiscardPolicy 和 DiscardOldestPolicy。
中止(Abort)策略:
默认的饱和策略,该策略将抛出未检查的 RejectedExecutionException。
抛弃最旧的(Discard-Oldest)策略
该策略则会抛弃下一个将被执行的任务,然后尝试重新提交新的任务。
调用者运行(Caller-Runs)策略:
该策略实现了一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。它不会在线程池的某个线程中执行新提交的任务,而是在一个调用了execute的线程中执行该任务。
创建一个可变长的线程池,使用受限队列和 “调用者运行” 饱和策略
1 2 3 4 5 6 7 8 9 10 11 12 13
| private static final Integer CORE_POOL_SIZE = 1000; private static final Integer MAXIMUM_POOL_SIZE = 2000; private static final Long KEEP_ALIVE_TIME = 0L; private static final Integer CAPACITY = 1000;
ThreadPoolExecutor executor = new ThreadPoolExecutor( CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>(CAPACITY), new ThreadPoolExecutor.CallerRunsPolicy() );
|
使用 Semaphore 来遏制任务的提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| import net.jcip.annotations.ThreadSafe;
import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore;
@ThreadSafe public class BoundedExecutor { private final Executor exec; private final Semaphore semaphore;
public BoundedExecutor(Executor exec, int bound) { this.exec = exec; this.semaphore = new Semaphore(bound); }
public void submitTask(final Runnable command) throws InterruptedException { semaphore.acquire(); try { exec.execute(new Runnable() { public void run() { try { command.run(); } finally { semaphore.release(); } } }); } catch (RejectedExecutionException e) { semaphore.release(); } } }
|
1.3.4 线程工厂
- 每当线程池需要创建一个线程时,都是通过线程工厂方法来完成的。
- 默认的线程工厂方法将创建一个新的、非守护的线程,并且没有特殊的配置。
- 通过指定一个线程工厂方法,可以定制线程池的配置信息。
- 在 ThreadFactory 中只定义了一个方法 newThread(),每当线程池需要创建一个新线程时都会调用这个方法。
定制的线程工厂
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger;
public class MyThreadFactory implements ThreadFactory { private final String poolName;
public MyThreadFactory(String poolName) { this.poolName = poolName; }
public Thread newThread(Runnable runnable) { return new MyAppThread(runnable, poolName); } }
|
自定义的线程基类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| class MyAppThread extends Thread { public static final String DEFAULT_NAME = "MyAppThread"; private static volatile boolean debugLifecycle = false; private static final AtomicInteger created = new AtomicInteger(); private static final AtomicInteger alive = new AtomicInteger(); private static final Logger log = Logger.getAnonymousLogger();
public MyAppThread(Runnable r) { this(r, DEFAULT_NAME); }
public MyAppThread(Runnable runnable, String name) { super(runnable, name + "-" + created.incrementAndGet()); setUncaughtExceptionHandler(new UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { log.log(Level.SEVERE, "未捕获的异常线程:" + t.getName(), e); } }); }
public void run() { boolean debug = debugLifecycle; if (debug) log.log(Level.FINE, "Created " + getName()); try { alive.incrementAndGet(); super.run(); } finally { alive.decrementAndGet(); if (debug) log.log(Level.FINE, "Exiting " + getName()); } }
public static int getThreadsCreated() { return created.get(); }
public static int getThreadsAlive() { return alive.get(); }
public static boolean getDebug() { return debugLifecycle; }
public static void setDebug(boolean b) { debugLifecycle = b; } }
|
1.3.5 构造后再定制 ThreadPoolExecutor
大多数通过构造函数传递给 ThreadPoolExecutor 的参数(比如核心池大小,最大池大小,存活时间,线程工厂和拒绝执行处理器(rejected execution handler)),都可以在创建 Executor 后通过 setters
进行修改。Executors.newSingleThreadExecutor()
创建的 Executor 除外。
1 2 3 4 5 6
| public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
|
newSingleThreadExecutor() 与其他方法的实现不同,它按 FinalizableDelegatedExecutorService
方式封装的 ExecutorService,而不是原始的 ThreadPoolExecutor。
在 Executors中包含一个 unconfigurableExecutorService() 工厂方法:
1 2 3 4 5
| public static ExecutorService unconfigurableExecutorService(ExecutorService executor) { if (executor == null) throw new NullPointerException(); return new DelegatedExecutorService(executor); }
|
它返回一个现有的 ExecutorService,并对它进行包装。它只暴露出 ExecutorService 的方法,不能进行进一步的配置。
1.4 扩展 ThreadPoolExecutor
ThreadPoolExecutor 是可扩展的, 它提供了几个 “钩子” 方法让子类去复写:
beforeExecute()
1 2 3 4 5 6 7 8 9 10
|
protected void beforeExecute(Thread t, Runnable r) { }
|
afteExecute()
1 2 3 4 5 6 7 8 9 10
|
protected void afterExecute(Runnable r, Throwable t) { }
|
terminated()
1 2
| protected void terminated() { }
|
扩展线程池,以提供日志和计时功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger;
public class TimingThreadPool extends ThreadPoolExecutor {
public TimingThreadPool() { super(1, 1, 0L, TimeUnit.SECONDS, null); }
private final ThreadLocal<Long> startTime = new ThreadLocal<Long>(); private final Logger logger = Logger.getLogger(String.valueOf(TimingThreadPool.class)); private final AtomicLong numTasks = new AtomicLong(); private final AtomicLong totalTime = new AtomicLong();
protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); logger.fine(String.format("Thread %s: start %s", t, r)); startTime.set(System.nanoTime()); }
protected void afterExecute(Runnable r, Throwable t) { try { long endTime = System.nanoTime(); long taskTime = endTime - startTime.get(); numTasks.incrementAndGet(); totalTime.addAndGet(taskTime); logger.fine(String.format("Thread %s: end %s, time=%dns", t, r, taskTime)); } finally { super.afterExecute(r, t); } }
protected void terminated() { try { logger.info(String.format("Terminated: avg time=%dns", totalTime.get() / numTasks.get())); } finally { super.terminated(); } } }
|
2. 并行递归算法
2.1 并行递归算法
如果每一个循环的每次迭代都睡觉哦独立的,并且我们不必等待所有的迭代都完成后再一次处理,那么我们可以使用 Executor 把一个顺序的循环转化为并行的循环。
把顺序执行转换为并行执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| import java.util.Collection; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;
public abstract class TransformingSequential {
void processSequentially(List<Element> elements) { for (Element e : elements) { process(e); } }
void processInParallel(Executor exec, List<Element> elements) { for (final Element e : elements) { exec.execute(() -> process(e)); } }
public abstract void process(Element e);
interface Element { } }
|
当每个选代彼此独立,并且完成循环体中每个送代的工作,意义都足够重大,足以弥补管理一个新任务的开销时,这个顺序循环是适合并行化的。
把顺序递归转化为并行递归
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| import java.util.Collection; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;
public abstract class TransformingSequential {
public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) { for (Node<T> n : nodes) { results.add(n.compute()); sequentialRecursive(n.getChildren(), results); } }
public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) { for (final Node<T> n : nodes) { exec.execute(() -> results.add(n.compute())); parallelRecursive(exec, n.getChildren(), results); } }
public <T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); Queue<T> resultQueue = new ConcurrentLinkedQueue<T>(); parallelRecursive(exec, nodes, resultQueue); exec.shutdown(); exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); return resultQueue; }
interface Node <T> { T compute(); List<Node<T>> getChildren(); } }
|
2.2 示例:谜题框架
“搬箱子” 谜题的抽象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| import java.util.Set;
public interface Puzzle <P, M> { P initialPosition();
boolean isGoal(P position);
Set<M> legalMoves(P position);
P move(P position, M move); }
|
谜题解决者框架的链节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| import net.jcip.annotations.Immutable;
import java.util.LinkedList; import java.util.List;
@Immutable public class PuzzleNode <P, M> { final P pos; final M move; final PuzzleNode<P, M> prev;
public PuzzleNode(P pos, M move, PuzzleNode<P, M> prev) { this.pos = pos; this.move = move; this.prev = prev; }
List<M> asMoveList() { List<M> solution = new LinkedList<M>(); for (PuzzleNode<P, M> node = this; node.move != null; node = node.prev) { solution.add(0, node.move); } return solution; } }
|
顺序话版的谜题解决者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| import java.util.HashSet; import java.util.List; import java.util.Set;
public class SequentialPuzzleSolver <P, M> { private final Puzzle<P, M> puzzle; private final Set<P> seen = new HashSet<P>();
public SequentialPuzzleSolver(Puzzle<P, M> puzzle) { this.puzzle = puzzle; }
public List<M> solve() { P pos = puzzle.initialPosition(); return search(new PuzzleNode<P, M>(pos, null, null)); }
private List<M> search(PuzzleNode<P, M> node) { if (!seen.contains(node.pos)) { seen.add(node.pos); if (puzzle.isGoal(node.pos)) { return node.asMoveList(); } for (M move : puzzle.legalMoves(node.pos)) { P pos = puzzle.move(node.pos, move); PuzzleNode<P, M> child = new PuzzleNode<>(pos, move, node); List<M> result = this.search(child); if (result != null) { return result; } } } return null; } }
|
并发版的谜题解决者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| import net.jcip.annotations.GuardedBy; import net.jcip.annotations.ThreadSafe;
import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor;
public class ConcurrentPuzzleSolver <P, M> {
private final Puzzle<P, M> puzzle; private final ExecutorService exec; private final ConcurrentMap<P, Boolean> seen; protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<>();
public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) { this.puzzle = puzzle; this.exec = initThreadPool(); this.seen = new ConcurrentHashMap<P, Boolean>(); if (exec instanceof ThreadPoolExecutor) { ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) exec; threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); } }
private ExecutorService initThreadPool() { return Executors.newCachedThreadPool(); }
public List<M> solve() throws InterruptedException { try { P p = puzzle.initialPosition(); exec.execute(newTask(p, null, null)); PuzzleNode<P, M> solutionPuzzleNode = solution.getValue(); return (solutionPuzzleNode == null) ? null : solutionPuzzleNode.asMoveList(); } finally { exec.shutdown(); } }
protected Runnable newTask(P position, M move, PuzzleNode<P, M> puzzleNode) { return new SolverTask(position, move, puzzleNode); }
protected class SolverTask extends PuzzleNode<P, M> implements Runnable { SolverTask(P pos, M move, PuzzleNode<P, M> prev) { super(pos, move, prev); }
public void run() { if (solution.isSet() || seen.putIfAbsent(pos, true) != null) { return; } if (!puzzle.isGoal(pos)) { for (M move : puzzle.legalMoves(pos)) { exec.execute( newTask(puzzle.move(pos, move), move, this) ); } } else { solution.setValue(this); } } } }
|
ConcurrentPuzzleSolver 使用可携带结果的闭锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @ThreadSafe public class ValueLatch <T> { @GuardedBy("this") private T value = null; private final CountDownLatch done = new CountDownLatch(1);
public boolean isSet() { return (done.getCount() == 0); }
public synchronized void setValue(T newValue) { if (!isSet()) { value = newValue; done.countDown(); } }
public T getValue() throws InterruptedException { done.await(); synchronized (this) { return value; } } }
|