概要
- Callable和Future简介
- ThreadPoolExecutor中submit分析
- FutureTask源码分析
1. Callable 和 Future 简介
Executor框架使用Runnable作为其基本的任务表示形式。Runnable是一种有很大局限的抽象,虽然run能写入到日志文件或者将结果放入某个共享的数据结构,但它不能返回一个值或抛出一个受检查的异常。
许多任务实际上都是存在延迟的计类如执行数据库插叙、网络上获取资源,或者计算某个复杂的功能,对于这些任务,Callable是一种更好的抽象:他认为主入口点(即call)将返回一个值,并可能抛出一个异常。
在Executor中包含了一些辅助的方法能将其他类型的任务封装为一个Callabe,其实主要是Runnable类型对象。
Runnable和Callable描述的都是抽象的计算任务。这些任务通常是有范围的,即都有一个明确的起始点,并且最终会结束。Executor执行的任务有4个声明周期阶段:创建、提交、开始和完成。由于有些任务可能要执行很长的时间,因此通常希望能够取消这些任务。在Executor框架中,已提交但尚未开始的任务可以取消,但对于那些已经开始执行的任务,只能当他们响应中断时,才能取消,也就是你自己在封装任务的时候,在里面封装了响应中断的逻辑,即Thread.interput()
。
Future表示一个任务的生命周期,并定义相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。并且在Future规范中的隐含意义是,任务声明周期只能前进,不能后退,就像ExecutorService的生命周期一样。当某个任务完成后,他就永远停留在完成状态上。
Callable
Callable是一个接口,它只包含一个call()方法。Callable是一个返回结果并且可能抛出异常的任务。为了便于理解,我们可以将Callable比作一个Runnable接口,而Callable的call()方法则类似于Runnable的run()方法。
Callable的源码如下:
1 2 3
| public interface Callable<V> { V call() throws Exception; }
|
说明:从中我们可以看出Callable支持泛型。
Future
Future 是一个接口。它用于表示异步计算的结果。提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。
Future的源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning)
boolean isCancelled()
boolean isDone()
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
|
ThreadPoolExecutor中submit分析
前面我们已经对ThreadPoolExecutor中execute进行了分析,在execute中执行的任务是没有返回结果。这在很大程度上限制了这个方法的使用,因此在ExecutorService中提供了submit方法,可以在任务执行完成后返回结果,这个方法有三个重载方法,源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; }
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
|
通过看上面的代码,大体上代码的结构都是相同的,首先通过newTaskFor方法创建一个RunnableFuture对象,然后使用execute执行这个任务。
下面我们看一下newTaskFor这个方法,这个方法也包括俩个重载的方法,源码如下
1 2 3 4 5 6 7
| protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); }
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
|
这个方法是将提供的Runnable和Callable接口封装在FutureTask内,然后返回一个RunnableFuture对象,事实上,FutureTask实现了RunnablFuture这个接口,下面我们来具体看一下FutureTask
FutureTask源码分析
Future继承体系如下
从上面可以看出FutureTask实现了RunnableFuture,而这个接口就是讲Future和Runnable俩个接口集成在一起。所以上面newTaskFor方法返回RunnableFuture,这也体现了面向接口编程,方便以后进行扩展。
下面开始分析源码,首先介绍后面要用到的属性已经这个对象的生命周期(毕竟Future的一个责任就是检测任务是否已完成)
1 2 3 4 5 6 7 8 9 10 11
| private volatile int state;
private Callable<V> callable;
private Object outcome;
private volatile Thread runner;
private volatile WaitNode waiters;
|
下面我们先来了解一下,Future表示一个任务的状态,有以下几种,
1 2 3 4 5 6 7
| NEW :任务新创建状态 COMPLETING :任务完成状态 NORMAL :正常完成状态 EXCEPTIONAL :异常完成状态 CANCELLED :取消状态 INTERRUPTING :正在中断状态 INTERRUPTED :已经被中断状态
|
状态只能从一个状态转变到另外一个状态,不能后退,状态的转换大致上有以下几种:
1 2 3 4
| NEW -> COMPLETING -> NORMAL NEW -> COMPLETING -> EXCEPTIONAL NEW -> CANCELLED NEW -> INTERRUPTING -> INTERRUPTED
|
FutureTask构造函数
1 2 3 4 5 6 7 8 9 10 11 12 13
| public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; }
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; }
|
下面来具体分析一下 FutureTask中run函数
在newTaskFor()新建一个ftask对象之后,会通过execute(ftask)执行该任务。此时ftask被当作一个Runnable对象进行执行,最终会调用到它的run()方法:源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
|
说明:run()中会执行Callable对象的call()方法,如果正常执行完成,最终将结果保存到result中,通过set(result)将result保存,并唤醒所有等待的获取结果的线程。 或者执行遇到异常,通过setException将异常保存,并唤醒所有等待结果的线程。现在我们来具体看看set和setException这俩个函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); finishCompletion(); } }
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); finishCompletion(); } }
|
其实方法大体上是类似的,首先通过CAS变量设置任务状态,并设置执行的结果或异常到outcome中,然后唤醒所有的等待的线程,这个等待的线程是通过future.get()获取任务结果而造成的等待,这里我们先说这个方法,然后在来说唤醒等待。
FutureTask的get函数源码
get是用来得到任务执行的结果,如果任务没有执行完成,就会暂停当前任务的执行:
1 2 3 4 5 6
| public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
|
基本上代码逻辑很清晰,先判断任务是否完成,如果没有完成则进入等待直到完成,任务完成之后通过report方法返回结果
我们首先看看后面涉及到的一个对象,不过这里涉及到一个链表节点的表示,源码如下
1 2 3 4 5
| static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
|
上面可以看出,就是将所有等待的线程用链表的形式表示出来,下面具体看看awaitDone方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); }
int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next= waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }
|
其实上面逻辑还是比较简单,首先判断任务是否已经执行完成,如果没有则创建一个等待节点,将当前线程插入到等待链表中,然后进入等待状态,等待任务完成被唤醒,同时会让出cpu。如果已经完成,则直接返回。这里为什么用for循环,是因为有可能任务还没有完成,但线程被唤醒,类如线程响应中断信号,这里就需要在此检查唤醒的条件是否是任务已经执行完成(完成还代表任务执行出现异常)。
下面我们来看上面遗留的一个方法,任务执行完成后,唤醒所有等待的线程:源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| private void finishCompletion() { for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; q = next; } break; } } done(); callable = null; }
|
源码比较简单,就不在解释了,我们来看看为什么将唤醒逻辑放置在run方法中。其实这样设计是因为任务是否执行完成,Future是无法决定的,只有执行这个任务的线程才知道,所以就把唤醒线程的代码放在run方法里面。
取消任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; }
|
上面取消只有在任务处于NEW状态时才能取消成功,如果强制正在执行的任务取消,则要看任务中有没有响应中断的处理逻辑。这个我们看下面的例子就能明白
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package JUC.executor;
import java.util.concurrent.*;
public class ExecutorStudy {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newSingleThreadExecutor(); Future future = executorService.submit(() -> { while (true) { System.out.println("Thread is running..."); long time = System.currentTimeMillis(); while ((System.currentTimeMillis() - time < 1000)) { } } }); Thread.sleep(1000); while (true) { Thread.sleep(1000); System.out.println(future.cancel(true)); System.out.println(future.isCancelled()); System.out.println(future.isDone()); future.get(); } } }
|
上面就是一个,任务正在执行,我们发出了强制任务取消执行,但是任务依然回执行,因为我们没有在任务执行逻辑中封装响应中断的处理逻辑。这个例子你运行一下就会明白。
总结
- 通过Runable和Callable创建FutureTask对象
- 通过submit方法,提交任务执行,通过get调用获取结果
- 如果已经完成,包括异常和正常结束,则直接返回
- 如果调用get的时候,任务已经在执行,则通过Thread.yield让出线程,等待任务执行完成,不进入阻塞队列
- 如果等待执行,则将当前线程插入阻塞队列,等待被唤醒
- 任务执行完成,设置异常或者完成状态,并返回结果,同时唤醒所有阻塞的线程。