本篇文章主要是对ThreadPoolExecutor进行源码分析,讲解线程池是如何实现.
线程池使用例子
线程池状态
任务执行流程分析
线程池使用例子 简单例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import java.util.concurrent.Executors;import java.util.concurrent.ExecutorService;public class ThreadPoolDemo1 { public static void main (String[] args) { ExecutorService pool = Executors.newFixedThreadPool(2 ); Thread ta = new MyThread (); Thread tb = new MyThread (); Thread tc = new MyThread (); Thread td = new MyThread (); Thread te = new MyThread (); pool.execute(ta); pool.execute(tb); pool.execute(tc); pool.execute(td); pool.execute(te); pool.shutdown(); } } class MyThread extends Thread { @Override public void run () { System.out.println(Thread.currentThread().getName()+ " is running." ); } }
运行结果 :
1 2 3 4 5 pool-1-thread-1 is running. pool-1-thread-2 is running. pool-1-thread-1 is running. pool-1-thread-2 is running. pool-1-thread-1 is running.
示例中,使用了Excutors工具类来创建线程池,并提交任务到线程池上运行,然后关闭线程池,这是一个简单的实例。接下来我们将进行ThreadPoolExecutor深入分析。
线程池状态 在线程池中使用了一个AtomicInteger对象来表示线程池的状态和任务的数量。其中Integer是32位,用高三位来表示线程池的状态,至于怎么计算不是这里的重点,这里我们先讲解任务的状态。
线程池中任务的状态有以下5种:
1 2 3 4 5 RUNNING :接收新的任务和处理队列中的任务 SHUTDOWN :不在接收新的任务和但是处理队列中剩余的任务。 STOP :不在接收新的任务,同时不在处理队列中的任务,线程上正在运行的任务也会被中断。 TIDYING :所有的任务已经结束,并且线程的数量为0,线程的状态转换成清理的状态,接下来将会运行terminated()方法。 TERMINATED:erminated()执行完成
状态的转换:
1 2 3 4 5 RUNNING -> SHUTDOWN:调用 shutdown()方法后,也会包含一些回收的处理 (RUNNING or SHUTDOWN) -> STOP:调用shutdownNow() SHUTDOWN -> TIDYING:当线程池和任务队列都为空 STOP -> TIDYING:当线程池为空 TIDYING -> TERMINATED: 当terminated() 方法已经完成
具体的状态转换如上面所示。另外线程池的状态是通过比特为来表示的,使用ctl这个原子变量的高三位来确定的。具体的如下所示。
1 2 3 4 5 private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
源码分析 在进行详细的源码分析之前,我们先看下ThreadPoolExecutor
中的几个比较重要的成员变量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 private final AtomicInteger ctl = new AtomicInteger (ctlOf(RUNNING, 0 )); private final BlockingQueue<Runnable> workQueue; private final ReentrantLock mainLock = new ReentrantLock ();private final HashSet<Worker> workers = new HashSet <Worker>();private final Condition termination = mainLock.newCondition();private int largestPoolSize;private long completedTaskCountprivate volatile ThreadFactory threadFactory;private volatile RejectedExecutionHandler handler;private volatile long keepAliveTime;private volatile boolean allowCoreThreadTimeOut;private volatile int corePoolSize;private volatile int maximumPoolSize;
这边重点解释下 corePoolSize
、maximumPoolSize
、workQueue
两个变量,这两个变量涉及到线程池中创建线程个数的一个策略。corePoolSize
: 这个变量我们可以理解为线程池的核心大小,举个例子来说明(corePoolSize假设等于10,maximumPoolSize等于20):
有一个部门,其中有10(corePoolSize)名工人,当有新任务来了后,领导就分配任务给工人去做,每个工人只能做一个任务。
当10个工人都在忙时,新来的任务就要放到队列(workQueue)中等待。
当任务越积累越多,远远超过工人做任务的速度时,领导就想了一个办法:从其他部门借10个工人来,借的数量有一个公式(maximumPoolSize - corePoolSize)来计算。然后把新来的任务分配给借来的工人来做。
但是如果速度还是还不急的话,可能就要采取措施来放弃一些任务了(RejectedExecutionHandler)。 等到一定时间后,任务都完成了,工人比较闲的情况下,就考虑把借来的10个工人还回去(根据keepAliveTime判断)
也就是说corePoolSize就是线程池大小,maximumPoolSize在我看来是线程池的一种补救措施,即任务量突然过大时的一种补救措施。
任务执行:execute 源码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public void execute (Runnable command) { if (command == null ) throw new NullPointerException (); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); } }
说明 :execute()的作用是将任务添加到线程池中执行。它会分为3种情况进行处理:
情况1 – 如果线程池中任务数量小于核心池大小时,即线程池中少于corePoolSize个任务;此时就新建一个线程,并将该任务添加到线程中进行执行。
情况2 – 如果线程池中任务数量大于核心池大小,并且线程池是运行的状态;此时,则将任务添加到阻塞队列中阻塞等待。在该情况下,会再次确认线程池的状态,如果当前线程池处理非运行状态,则从阻塞队列中删除该任务,并进行拒绝。
情况3 – 非以上两种情况。在这种情况下,尝试新建一个线程,并将该任务添加到线程中进行执行。如果执行失败,则通过reject()拒绝该任务。
到这里,大部分朋友应该对任务提交给线程池之后到被执行的整个过程有了一个基本的了解,下面总结一下:
首先,要清楚corePoolSize和maximumPoolSize的含义;
其次,要知道Worker是用来起到什么作用的;
要知道任务提交给线程池之后的处理策略,这里总结一下主要有4点:
如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
如果当前线程池中的线程数目大于等于corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
如果线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于 corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。
addWorker源码分析 源码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { final ReentrantLock mainLock = this .mainLock; w = new Worker (firstTask); final Thread t = w.thread; if (t != null ) { mainLock.lock(); try { int c = ctl.get(); int rs = runStateOf(c); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException (); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (!workerStarted) addWorkerFailed(w); } return workerStarted; }
说明 :
addWorker(Runnable firstTask, boolean core) 的作用是将任务(firstTask)添加到线程池中,并启动该任务。core为true的话,则以corePoolSize为界限,若”线程池中已有任务数量>=corePoolSize”,则返回false;core为false的话,则以maximumPoolSize为界限,若”线程池中已有任务数量>=maximumPoolSize”,则返回false。
addWorker()会先通过for循环不断尝试更新ctl状态,ctl记录了”线程池中任务数量和线程池状态”。更新成功之后,再通过try模块来将任务添加到线程池中,并启动任务所在的线程。
从addWorker()中,我们能清晰的发现:线程池在添加任务时,会创建任务对应的Worker对象;而一个Workder对象包含一个Thread对象。 (01) 通过将Worker对象添加到”线程的workers集合”中,从而实现将任务添加到线程池中。 (02) 通过启动Worker对应的Thread线程,则执行该任务。
addWorkerFailed 如果Worker创建成功,但是没有启动,这时我们需要将从线程对象从works上移除,否则会影响线程池的性能,源码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private void addWorkerFailed (Worker w) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { if (w != null ) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }
这时你们会不会有个疑问,一般我们写程序时,如果想让线程一直运行,则会向下面这样写:
1 2 3 4 5 public void run () {while (true ){ } }
但是上面执行流程分析完了,但是没看到ThreadPoolExecutor怎么定义线程一直运行,这时我们就要去分析Worker这个内部类的源码,这里面有我们想要的结果:
源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L ; final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1 ); this .firstTask = firstTask; this .thread = getThreadFactory().newThread(this ); } public void run () { runWorker(this ); } protected boolean isHeldExclusively () { return getState() != 0 ; } protected boolean tryAcquire (int unused) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } protected boolean tryRelease (int unused) { setExclusiveOwnerThread(null ); setState(0 ); return true ; } public void lock () { acquire(1 ); } public boolean tryLock () { return tryAcquire(1 ); } public void unlock () { release(1 ); } public boolean isLocked () { return isHeldExclusively(); } void interruptIfStarted () { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
分析:
从上面我们可以看出,Worker继承AQS,实现Runnable接口,继承AQS实现了一个简单的互斥锁,是不想在Worker运行的时候使用外部类的互斥锁,这样可以减少线程的等待。
线程创建时通过外部类的threadFactory来创建的,后面我会讲解这个类,其实很简单
从上面可以看出Worker类的run方法实现实际上是外部类的runWorker方法实现的,源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error (x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } }
从上面代码可以看到这边在循环获取任务,并执行,直到任务全部执行完毕。除了第一个任务,其他任务都是通过getTask()
方法去取,这个方法是ThreadPoolExecutor中的一个方法。我们猜一下,整个类中只有任务缓存队列中保存了任务,应该就是去缓存队列中取了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 Runnable getTask () { for (;;) { try { int state = runState; if (state > SHUTDOWN) return null ; Runnable r; if (state == SHUTDOWN) r = workQueue.poll(); else if (poolSize > corePoolSize || allowCoreThreadTimeOut) r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else r = workQueue.take(); if (r != null ) return r; if (workerCanExit()) { if (runState >= SHUTDOWN) interruptIdleWorkers(); return null ; } } catch (InterruptedException ie) { } } }
这里有一个非常巧妙的设计方式,假如我们来设计线程池,可能会有一个任务分派线程,当发现有线程空闲时,就从任务缓存队列中取一个任务交给空闲线程执行。但是在这里,并没有采用这样的方式,因为这样会要额外地对任务分派线程进行管理,无形地会增加难度和复杂度,这里直接让执行完任务的线程Worker去任务缓存队列里面取任务来执行,因为每一个Worker里面都包含了一个线程thread。
还需要注意的是,当线程死亡如何处理:
源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 private void processWorkerExit (Worker w, boolean completedAbruptly) { if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1 ; if (workerCountOf(c) >= min) return ; } addWorker(null , false ); } }
对上面的过程进行一个总结
首先的任务执行分为三种情况
当线程池中的线程少于核心线程,则来一个任务创建一个线程
如果线程池中任务数量大于核心池大小,并且线程池是运行的状态;此时,则将任务添加到阻塞队列中阻塞等待。在该情况下,会再次确认线程池的状态,如果当前线程池处理非运行状态,则从阻塞队列中删除该任务,并进行拒绝。
如果添加失败,则在尝试新建一个线程,并将该任务添加到线程中进行执行。如果执行失败,则通过reject()拒绝该任务。
接着创建线程并启动,也就是worker,如果是从上面创建来的,则每一个线程都会有一个任务,这时会直接启动并执行任务。
当任务执行完成,会尝试从队列中获取任务,利用队列的阻塞来阻塞当前线程,
如果阻塞是有限时间的阻塞,也就是设置了最大空闲时间,则会阻塞在有限时间内,超过这个时间,会返回null
当检测到获取任务获取的是null,则结束第三步的任务等待,退出线程。
关闭线程池 shutdown()的源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public void shutdown () { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); }
说明 :shutdown()的作用是关闭线程池。
shutdownNow()源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public List<Runnable> shutdownNow () { List<Runnable> tasks; final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
任务的提交
submit任务,等待线程池execute
执行FutureTask类的get方法时,会把主线程封装成WaitNode节点并保存在waiters链表中, 并阻塞等待运行结果;
FutureTask任务执行完成后,通过UNSAFE设置waiters相应的waitNode为null,并通过LockSupport类unpark方法唤醒主线程;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class Test { public static void main (String[] args) { ExecutorService es = Executors.newCachedThreadPool(); Future<String> future = es.submit(new Callable <String>() { @Override public String call () throws Exception { try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { e.printStackTrace(); } return "future result" ; } }); try { String result = future.get(); System.out.println(result); } catch (Exception e) { e.printStackTrace(); } } }
在实际业务场景中,Future和Callable基本是成对出现的,Callable负责产生结果,Future负责获取结果。
Callable接口类似于Runnable,只是Runnable没有返回值。
Callable任务除了返回正常结果之外,如果发生异常,该异常也会被返回,即Future可以拿到异步执行任务各种结果;
Future.get方法会导致主线程阻塞,直到Callable任务执行完成;
submit方法 AbstractExecutorService.submit()实现了ExecutorService.submit()可以获取执行完的返回值, 而ThreadPoolExecutor是AbstractExecutorService.submit()的子类,所以submit方法也是ThreadPoolExecutor的方法。
1 2 3 4 5 6 7 <T> Future<T> submit (Callable<T> task) ; <T> Future<T> submit (Runnable task, T result) ; Future<?> submit(Runnable task);
1 2 3 4 5 6 7 8 9 public Future<?> submit(Runnable task) { if (task == null ) throw new NullPointerException (); RunnableFuture<Void> ftask = newTaskFor(task, null ); execute(ftask); return ftask; }
通过submit方法提交的Callable任务会被封装成了一个FutureTask对象。通过Executor.execute方法提交FutureTask到线程池中等待被执行,最终执行的是FutureTask的run方法;
FutureTask对象
1 public class FutureTask <V> implements RunnableFuture <V>
可以将FutureTask提交至线程池中等待被执行(通过FutureTask的run方法来执行) 内部状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private volatile int state;private static final int NEW = 0 ;private static final int COMPLETING = 1 ;private static final int NORMAL = 2 ;private static final int EXCEPTIONAL = 3 ;private static final int CANCELLED = 4 ;private static final int INTERRUPTING = 5 ;private static final int INTERRUPTED = 6 ;
内部状态的修改通过sun.misc.Unsafe修改 get方法
1 2 3 4 5 6 public V get () throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false , 0L ); return report(s); }
内部通过awaitDone方法对主线程进行阻塞,具体实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 private int awaitDone (boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L ; WaitNode q = null ; boolean queued = false ; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException (); } int s = state; if (s > COMPLETING) { if (q != null ) q.thread = null ; return s; } else if (s == COMPLETING) Thread.yield(); else if (q == null ) q = new WaitNode (); else if (!queued) queued = UNSAFE.compareAndSwapObject(this , waitersOffset,q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L ) { removeWaiter(q); return state; } LockSupport.parkNanos(this , nanos); } else LockSupport.park(this ); } }
如果主线程被中断,则抛出中断异常;
判断FutureTask当前的state,如果大于COMPLETING,说明任务已经执行完成,则直接返回;
如果当前state等于COMPLETING,说明任务已经执行完,这时主线程只需通过yield方法让出cpu资源,等待state变成NORMAL;
通过WaitNode类封装当前线程,并通过UNSAFE添加到waiters链表;
最终通过LockSupport的park或parkNanos挂起线程;
run方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public void run () { if (state != NEW || !UNSAFE.compareAndSwapObject(this , runnerOffset, null , Thread.currentThread())) return ; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true ; } catch (Throwable ex) { result = null ; ran = false ; setException(ex); } if (ran) set(result); } } finally { runner = null ; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
FutureTask.run方法是在线程池中被执行的,而非主线程通过执行Callable任务的call方法; 如果call执行成功,则通过set方法保存结果; 如果call执行有异常,则通过setException保存异常;