创建ThreadPoolExecutor对象的完整构造函数如下
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
要说明maximumPoolSize
和workQueue
分别表示该线程池中最大线程数和当线程池中的线程数量超过corePoolSize
所使用的等待队列。
注意,workQueue
是线程池中的线程数量超过corePoolSize
所使用的等待队列。也就是说,如果使用下面的代码创建一个线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(5));
当线程池中已经有两个正在执行的线程,再添加第三个线程时,线程池不会新建一个非核心任务来执行第三个线程,而是会把第三个线程放到LinkedBlockingQueue
中成再执行。
为什么会是这样呢?我们来看一下ThreadPoolExecutor
的execute
方法:
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@link RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // 当工作任务数小于核心线程池容量时,直接作为核心任务开始执行
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // 反之,尝试向workQueue中添加任务
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) // 二次检查,如果此时线程池被关闭则回滚排队并拒绝
reject(command);
else if (workerCountOf(recheck) == 0) // 如果向队列成功添加任务之后,Worker数为0则新建一个空Worker来从队列中顺序获取任务执行
addWorker(null, false);
}
else if (!addWorker(command, false)) // 如果上面都失败,最终还是会通过强行添加一个Worker的方式来再次确认能否提交成功
reject(command);
}
如代码注释所述,原因也就明了了。