本文只是先简单的介绍下线程池的整体架构,对整体有一个清晰的认识,接着演示一个简单的线程池使用案例。
线程池类图如下:
Executor
Executor执行提交的任务,解耦任务的执行和创建,它提供了执行的接口,是来执行任务的。只要提交的任务实现了Runnable接口,就可以将此任务交给Executor来执行,而不用在向之前一样显示的创建线程来执行任务。
这个接口只包含一个函数,代码如下:
1 2 3 4
| public interface Executor{ void execute(Runnable command) }
|
ExecutorService
通过Executor来实现任务的提交与运行,Executor会创建线程来执行任务。但JVM只有在所有非守护线程全部终止后才会退出,因此如果无法正确的关闭Executor,那么JVM将无法结束。此外我们还希望线程池有一些关闭线程,批量提交任务等功能,这个Executor是无法做的。因此使用ExecutorService接口来丰富线程池的功能,但是为什么这么设计,设计的合理性我自己看来是不合理的,如果你觉得我说的不对,可以在下面评论区评论发表意见。
ExecutorService源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| boolean awaitTermination(long timeout, TimeUnit unit)
<T List<Future<T invokeAll(Collection<? extends Callable<T tasks)
<T List<Future<T invokeAll(Collection<? extends Callable<T tasks, long timeout, TimeUnit unit)
<T T invokeAny(Collection<? extends Callable<T tasks)
<T T invokeAny(Collection<? extends Callable<T tasks, long timeout, TimeUnit unit)
boolean isShutdown()
boolean isTerminated()
void shutdown()
List<Runnable shutdownNow()
<T Future<T submit(Callable<T task)
Future<? submit(Runnable task)
<T Future<T submit(Runnable task, T result)
|
关闭任务的方式:
- 直接关闭,相当于断开电源
- 执行完所有当前线程上执行的任务,不在接收新的任务。然后关闭
Executor接口定义的方法不足以满足这些要求的实现,所以有了ExecutorService接口,添加了一些用于生命周期管理的方法(同时还有一些用于任务提交的便利方法)。
ExecutorService的生命周期有三种状态:运行、关闭和已终止。
- 在初始创建时处于运行状态,
- shutdown方法将执行平缓的关闭状态:不在接收新的任务,同时等待已经提交的任务执行完成,包括那些还未开始执行的任务
- shutdownNow方法将执行粗暴的关闭过程,将尝试取消所有运行中的任务。通过isTerminated来确定线程池是否终止,终止后不同拒绝策略有不同的返回结果的方式。
AbstractExecutorService
AbstractExecutorService是一个抽象类,实现了ExecutorService接口。AbstractExecutorService存在的目的是为ExecutorService中的函数接口提供了默认实现。方便我们定制线程池。这个类的方法和ExecutorService一样,所有就不列出来,后面再分析线程池源码时我会在来说这个类。
ThreadPoolExecutor
ThreadPoolExecutor就是大名鼎鼎的”线程池”,它继承于AbstractExecutorService抽象类。是线程池的主要实现类,也是我们后面关注的重点,因此就现在这里提一下,后面会仔细讲。
ScheduledExecutorService
ScheduledExecutorService是一个接口,它继承于于ExecutorService。它提供了延时和周期执行功能的ExecutorService。可以取代原有的定时任务类Timer。 ScheduledExecutorService提供了相应的函数接口,可以安排任务在给定的延迟后执行,也可以让任务周期的执行。
ScheduledExecutorService函数列表
1 2 3 4 5 6 7 8
| <V ScheduledFuture<V schedule(Callable<V callable, long delay, TimeUnit unit)
ScheduledFuture<? schedule(Runnable command, long delay, TimeUnit unit)
ScheduledFuture<? scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
ScheduledFuture<? scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
|
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor继承于ThreadPoolExecutor,并且实现了ScheduledExecutorService接口。它相当于提供了”延时”和”周期执行”功能的ExecutorService。 ScheduledThreadPoolExecutor类似于Timer,但是在高并发程序中,ScheduledThreadPoolExecutor的性能要优于Timer。
在没有此接口之前,我们使用Timer来做定时任务,Timer定时任务的缺陷:
- 执行定时任务时,只创建一个线程,因此如果某个任务执行时间过长,会导致其他定时任务的执行周期加长
- 由于只创建了一个线程,当这个线程因为异常关闭之后,其他定时任务就无法启动。(这个问题称之为线程泄漏)
因此在5.0 之后很少使用这个类来做定时任务,换成了ScheduledThreadPoolExecutor来做定时任务。
同时要构建自己的调度任务还需要队列的支持,这时可以使用DelayQueue,他实现了BlockingQueue,并为ScheduledThreadPoolExecutor提供调度功能,DelayQUeue管理者一组Delayed对象,每个Delayed对象都有一个相应的延迟时间。在DelayQueue中,只有某个元素逾期后,才能从这个队列中take操作。
函数列表:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| ScheduledThreadPoolExecutor(int corePoolSize)
ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler)
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory)
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler)
protected <V RunnableScheduledFuture<V decorateTask(Callable<V callable, RunnableScheduledFuture<V task)
protected <V RunnableScheduledFuture<V decorateTask(Runnable runnable, RunnableScheduledFuture<V task)
void execute(Runnable command)
boolean getContinueExistingPeriodicTasksAfterShutdownPolicy()
boolean getExecuteExistingDelayedTasksAfterShutdownPolicy()
BlockingQueue<Runnable getQueue()
boolean remove(Runnable task)
<V ScheduledFuture<V schedule(Callable<V callable, long delay, TimeUnit unit)
ScheduledFuture<? schedule(Runnable command, long delay, TimeUnit unit)
ScheduledFuture<? scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
ScheduledFuture<? scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value)
void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value)
void shutdown()
List<Runnable shutdownNow()
<T Future<T submit(Callable<T task)
Future<? submit(Runnable task)
<T Future<T submit(Runnable task, T result)
|
Executors
Executors是个静态工厂类。它通过静态工厂方法返回ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 等类的对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| static Callable<Object callable(PrivilegedAction<? action)
static Callable<Object callable(PrivilegedExceptionAction<? action)
static Callable<Object callable(Runnable task)
static <T Callable<T callable(Runnable task, T result)
static ThreadFactory defaultThreadFactory()
static ExecutorService newCachedThreadPool()
static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
static ExecutorService newFixedThreadPool(int nThreads)
static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)
static ExecutorService newSingleThreadExecutor()
static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
static ScheduledExecutorService newSingleThreadScheduledExecutor()
static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)
static <T Callable<T privilegedCallable(Callable<T callable)
static <T Callable<T privilegedCallableUsingCurrentClassLoader(Callable<T callable)
static ThreadFactory privilegedThreadFactory()
static ExecutorService unconfigurableExecutorService(ExecutorService executor)
static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor)
|
简单演示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService;
public class ThreadPoolDemo1 {
public static void main(String[] args) { ExecutorService pool = Executors.newFixedThreadPool(2); Thread ta = new MyThread(); Thread tb = new MyThread(); Thread tc = new MyThread(); Thread td = new MyThread(); Thread te = new MyThread(); pool.execute(ta); pool.execute(tb); pool.execute(tc); pool.execute(td); pool.execute(te); pool.shutdown(); } }
class MyThread extends Thread {
@Override public void run() { System.out.println(Thread.currentThread().getName()+ " is running."); } }
|
运行结果:
1 2 3 4 5
| pool-1-thread-1 is running. pool-1-thread-2 is running. pool-1-thread-1 is running. pool-1-thread-2 is running. pool-1-thread-1 is running.
|
结果说明:
主线程中创建了线程池pool,线程池的容量是2。即,线程池中最多能同时运行2个线程。紧接着,将ta,tb,tc,td,te这3个线程添加到线程池中运行。最后,通过shutdown()关闭线程池。