Executor框架的使用

在Executor框架中,工作单元包括Runnable和Callable,执行机制由Executor框架提供。

两层调度模型

executor.png

多线程程序将任务分解为多个任务,然后由用户级调度器Executor将这些任务交由Java线程执行。Java线程并不是直接被CPU调度执行,还会映射为操作系统内核线程,由内核调度器将内核线程调度到CPU执行。祥见Java线程和os线程

Exectuor框架的结构

worker.png

manager.png
Executor框架由三大部分组成:

  • 任务,包括被执行任务需要实现的接口,Runnable接口和Callable接口;
  • 任务的执行,任务机制的和讯即接口及其实现类,ThreadPoolExecutor和ScheduledThreadPool;
  • 异步计算的结果,Future及其实现类FutureTask;

Executor框架的使用

use.png
主线程创建一个Runnable或者Callable任务(Executor可以将Runnable类型转换为Callable类型),然后交给Executor执行,主线程通过返回的Future接口,阻塞等待任务执行以后返回结果,也可以在等待过程中取消任务执行。

Executor框架核心类

  • ThreadPoolExecutor,线程池的核心实现类,用来执行被提交的任务。通过工厂类Executors实现三种类型线程池:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//核心线程数和最大工作线程数量相等,避免创建大量线程,适用于服务器负载较重的情况。
//使用无界队列LinkedBlockingQueue作为任务管理队列,意味着线程池中工作线程不会超过核心线程。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

//没有核心线程,最大工作线程取整数的最大值,适用于有比较多短期的小任务场景;
//使用SynchronousQueue作为工作队列,当主线程提交任务速度大于线程处理速度时,会不断创建线程,有可能会因为创建过多线程导致CPU和内存耗尽
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

//只会创建一个工作线程,适用于需要保证顺序的执行各个任务的场景。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

ScheduledThreadPoolExecutor用于在固定延迟后执行任务,通过Executors创建,包括两种类型:

1
2
3
4
5
6
7
8
9
10
//只会创建单个核心线程,工作线程为Integer.MAX_VALUE
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}

//可以设置多个核心线程,工作线程为Integer.MAX_VALUE
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

Runnable接口和Callable接口的实现类,都可以交个线程池执行,不同的是Callable接口可以返回结果.也可以将一个Runnable对象封装为Callable对象:

1
2
3
4
5
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}

ScheduledThreadPoolExecutor

1
2
3
4
5
6
7
//使用DelayWorkerQueue作为工作队列,这是一个无界阻塞队列,使用PriorityQueue实现。
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}

schedule.png
主线程向DelayWorkerQueue中添加任务时,任务会被包装为ScheduledFutureTask,线程池中的线程会从队列中取出任务执行。

ScheduledThreadPoolExecutor的实现

1
2
3
4
5
6
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns; //表示这个任务将要被执行的具体时间
this.period = 0; //表示任务执行的间隔周期,
this.sequenceNumber = sequencer.getAndIncrement(); //被添加入任务队列的顺序
}

DelayQueue中的任务是ScheduledFutureTask类型,包括三个成员变量。DelayQueue封装了一个PriorityQueue,会根据ScheduledFutureTask的time和sqquenceNumber进行排序。线程池中的线程会从任务队列中取出time大于当前时间的任务进行执行。在执行结束以后,会更新time,重新将任务放回队列之中。

1
2
3
4
5
6
7
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}

FutureTask

由于FutureTask继承了Future接口和Runnable接口,所以可以把一个FutureTask接口交由实现了Executor的线程池执行,也可以作为计算结果返回,然后执行FutureTask.get()阻塞当前线程等待返回计算结果。

1
<T> Future<T> submit(Runnable task, T result);

FutureTask是基于AbstractQueuedSynchronizer(AQS)实现的,很多可阻塞类都是基于AQS实现的,AQS是一个原子框架,提供了通用机制来原子性的管理状态,阻塞和唤醒线程,以及维护被阻塞线程的队列。对于很多阻塞类,其具体操作都会委托给实现了AQS的内部类Sync,由Sync进行具体的操作。

aqs.png

文章作者: hohnor
文章链接: http://www.zhulk3.cn/2022/02/10/Executor框架的使用/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 甜茶不贵