简介
前面提过,如果Future集合用于存放执行结果,执行任务,最后遍历Future集合获取结果;因为Future.get()
方法是阻塞的,因此不能及时获取已完成任务的执行结果。所以JUC提供了一个ExecutorService
来获取结果,使得任务的提交和结果的获取都能做到异步,从而实现真正的异步。
简单示例
下面是一个使用ExecutorService的简单demo,这个例子演示了poll
方法的使用,方法poll的作用是获取并移除表示下一个已完成任务的Future,如果不存在这样的任务,则返回null,方法poll是无阻塞的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class ExecutorServiceStudy { public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); CompletionService<String> service = new ExecutorCompletionService<String>(executorService); service.submit(new Callable<String>() { @Override public String call() throws Exception { TimeUnit.SECONDS.sleep(3); System.out.println("3 seconds pass."); return "3秒"; } }); System.out.println(service.poll()); executorService.shutdown(); } }
|
源码分析
下面是我从javaDoc,翻译的ExecutorService的官方解释,如果您的英文比较好,可以直接看英文原文,毕竟我的英语也只是个二把刀
1
| 一个Service用于解耦任务的异步提交和任务完成结果的获取。生产者提交任务执行。消费者取出完成的任务并按照完成的顺序来获取结果。(注意这里的完成顺序不一定是任务的提交顺序)。例如,CompletionService可用于管理异步I/O,其中执行读取的任务在程序或系统的某个部分中提交,然后在读取完成时在程序的不同部分中执行,可能在不同于他们提交的顺序。通常,CompletiongService依赖一个单独的线程池去执行任务,所有CompletionService仅仅管理内部的任务完成队列。ExecutorCompletionService是它的一个默认实现。
|
我们首先看看ExecutorService这个接口的定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public interface CompletionService<V> { Future<V> submit(Callable<V> task); Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException; }
|
从上面看,接口的定义相对来说比较简单,定义了来个提交任务的方法,和三个获取任务结果的方法,其中一个会阻塞等待直到有任务完成,另俩个非阻塞获取任务结果的方法。
具体实现:ExecutorCompletionService
首先谈谈我还没看这个类时的想法,我一看这个类的功能,想着实现这个接口肯定需要大量的代码,而且也会超级复杂。但是当我看到代码的时候,真的不得不佩服doug lea大师的抽象能力。
这里首先看下属性和一个内部类,这个后面都会用到:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() {
completionQueue.add(task);
} private final Future<V> task; }
|
任务的提交:submit
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; }
public Future<V> submit(Runnable task, V result) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task, result); executor.execute(new QueueingFuture(f)); return f; }
private RunnableFuture<V> newTaskFor(Callable<V> task) { if (aes == null) return new FutureTask<V>(task); else return aes.newTaskFor(task); }
private RunnableFuture<V> newTaskFor(Runnable task, V result) { if (aes == null) return new FutureTask<V>(task, result); else return aes.newTaskFor(task, result); }
|
整体代码还是比较简单,使用newTaskFor对提交的任务进行同意封装,然后提交到线程池中运行,不过这里需要对任务进一步封装,使用QueueingFuture来进行包装一层,而这个类前面已经说过就是个简单的扩展了FutureTask方法done的类。
完成任务的获取
1 2 3 4 5 6 7 8 9 10 11 12
| public Future<V> take() throws InterruptedException { return completionQueue.take(); }
public Future<V> poll() { return completionQueue.poll(); }
public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException { return completionQueue.poll(timeout, unit); }
|
这个我觉得都不需要解释,就是对队列的操作。队列可以是JUC里面的任意队列类,这里默认使用的是LinkedBlockQueue。我觉得构造方法比较简单,就没有在这里列出,里面定义了如何传入BlockQueue。
总结
从这个其实我们可以看出,类的实现非常简单,基本上都是在复用JUC中已经存在的类,但是这个是因为JUC的整体抽象设计的非常好,所以这里才会做的那么简单,但功能实现却很完美,这个真的值得我们去思考,如何才能设计出这样的工具类。