线程池程序开发中经常用到,比如网络和本地数据的访问 ,使用线程池能有效的控制线程数量,避免线程的频繁创建和回收。
1.成员变量 1 private final BlockingQueue<Runnable> workQueue;
表示任务队列。
1 private final HashSet<Worker> workers = new HashSet<Worker>();
Worker可以理解为发动机,每个Worker都会开启一个线程,workers表示存放线程的集合。
1 private volatile long keepAliveTime;
keepAliveTime表示线程活跃时间。因为线程池有核心线程个数,有最大线程个数,当线程池中的线程大于核心线程数时,线程的闲置时间超过keepAliveTime将会被回收。
1 2 3 4 5 private volatile int corePoolSize;private volatile int maximumPoolSize;private volatile int poolSize;
分别表示核心线程个数,最大线程个数,当前线程个数。
1 private volatile ThreadFactory threadFactory;
用于创建线程的工厂。
1 2 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
当线程池中的线程数量和任务数量都达到最大值,就会抛出异常给RejectedExecutionHandler处理。
2.核心方法流程
1 2 3 4 5 6 7 8 9 10 public void execute (Runnable command) { if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0 ) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); } }
(1)当当前线程个数小于核心线程个数,则会调用addIfUnderCorePoolSize。 (2)当当前线程个数大于核心线程个数进行下一步处理。
1 2 3 4 5 6 7 8 9 10 11 12 private boolean addIfUnderCorePoolSize (Runnable firstTask) { Thread t = null ; final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } return t != null ; }
判断前线程个数小于核心线程个数,true则添加线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private Thread addThread (Runnable firstTask) { Worker w = new Worker(firstTask); Thread t = threadFactory.newThread(w); boolean workerStarted = false ; if (t != null ) { if (t.isAlive()) throw new IllegalThreadStateException(); w.thread = t; workers.add(w); int nt = ++poolSize; if (nt > largestPoolSize) largestPoolSize = nt; try { t.start(); workerStarted = true ; } finally { if (!workerStarted) workers.remove(w); } } return t; }
创建新线程并启动。
ThreadPoolExecutor.Worker
1 2 3 4 5 6 7 8 9 10 11 12 13 public void run () { try { hasRun = true ; Runnable task = firstTask; firstTask = null ; while (task != null || (task = getTask()) != null ) { runTask(task); task = null ; } } finally { workerDone(this ); } }
首先运行初始化的第一个任务,后续从队列中获取任务。
ThreadPoolExecutor.Worker
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 Runnable getTask () { for (;;) { try { int state = runState; if (state > SHUTDOWN) return null ; Runnable r; if (state == SHUTDOWN) r = workQueue.poll(); else if (poolSize > corePoolSize || allowCoreThreadTimeOut) r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else r = workQueue.take(); if (r != null ) return r; if (workerCanExit()) { if (runState >= SHUTDOWN) interruptIdleWorkers(); return null ; } } catch (InterruptedException ie) { } } }
(1)线程已经停止,直接把任务队列中的数据获取出来,不阻塞登录获取。 (2)这里第二个判断条件是回收线程的重要的地方,在约定时间内获取任务,如果没有则返回null。 (3)阻塞等待获取任务。 (4)判断当前线程是否可用回收。
3. 添加任务处理流程 当一个任务通过execute(Runnable)方法欲添加到线程池时: 如果当前线程池中的数量小于corePoolSize,并线程池处于Running状态,创建并添加的任务。 如果当前线程池中的数量等于corePoolSize,并线程池处于Running状态,缓冲队列 workQueue未满,那么任务被放入缓冲队列、等待任务调度执行。 如果当前线程池中的数量大于corePoolSize,缓冲队列workQueue已满,并且线程池中的数量小于maximumPoolSize,新提交任务会创建新线程执行任务。 如果当前线程池中的数量大于corePoolSize,缓冲队列workQueue已满,并且线程池中的数量等于maximumPoolSize,新提交任务由Handler处理。 当线程池中的线程大于corePoolSize时,多余线程空闲时间超过keepAliveTime时,会关闭这部分线程。