Java 線程池知識點小結
在當今軟件開發(fā)領域,高性能和高并發(fā)處理能力成為了衡量應用程序質量的重要標準之一。Java 作為一門廣泛使用的編程語言,在支持多線程和并發(fā)編程方面提供了強大的工具——線程池。本文將深入探討 Java 線程池的原理、實現(xiàn)方式及其優(yōu)化技巧,旨在幫助開發(fā)者更好地理解和利用這一重要機制,從而構建更加高效穩(wěn)定的并發(fā)系統(tǒng)。
一、詳解線程池核心知識點
1. 為什么需要線程池
我們可以從性能、資源、安全等角度來回答這個問題:
- 提高響應速度:從性能角度來說,通過線程池進行池化統(tǒng)一管理線程,使用時直接通過線程池獲取,不再需要手動創(chuàng)建線程,響應速度大大提高。
- 降低資源消耗:從資源消耗的角度,由于線程池被池化管理了,我們無需為了某些功能去手動創(chuàng)建和銷毀線程,資源消耗自然降低。
- 便于管理和監(jiān)控:因為我們的工作線程都來自于線程池中所以對于線程的監(jiān)控和管理自然方便了許多。
2. 線程池使用示例
接下來我們展示了一個非常簡單的demo,創(chuàng)建一個含有3個線程的線程,提交3個任務到線程池中,讓線程池中的線程池執(zhí)行。 完成后通過shutdown停止線程池,線程池收到通知后會將手頭的任務都執(zhí)行完,再將線程池停止,所以筆者這里使用isTerminated判斷線程池是否完全停止了。只有狀態(tài)為terminated才能說明線程池關閉了,結束循環(huán),退出方法。
//創(chuàng)建含有3個線程的線程池
ExecutorService threadPool = Executors.newFixedThreadPool(3);
//提交3個任務到線程池中
for (int i = 0; i < 3; i++) {
finalint taskNo = i;
threadPool.execute(() -> {
log.info("執(zhí)行任務{}", taskNo);
});
}
//關閉線程池
threadPool.shutdown();
//如果線程池還沒達到Terminated狀態(tài),說明線程池中還有任務沒有執(zhí)行完,則繼續(xù)循環(huán)等待線程池執(zhí)行完任務
while (!threadPool.isTerminated()) {
}
對應輸出結果如下:
10:38:51.993 [pool-1-thread-3] INFO com.sharkChili.Main - 執(zhí)行任務2
10:38:51.993 [pool-1-thread-2] INFO com.sharkChili.Main - 執(zhí)行任務1
10:38:51.993 [pool-1-thread-1] INFO com.sharkChili.Main - 執(zhí)行任務0
3. 詳解線程池核心參數(shù)
我們上文通過Executors框架創(chuàng)建了線程池,它底層是通過ThreadPoolExecutor完成線程池的創(chuàng)建:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
可以看到ThreadPoolExecutor的構造方法包含下面幾個參數(shù),它們分別是:
- corePoolSize:核心線程數(shù),即時空閑也會保留在線程池中的線程。
- maximumPoolSize:線程池允許創(chuàng)建的最大線程數(shù),例如配置為10,那么線程池中最大的線程數(shù)就為10。
- keepAliveTime:核:心線程數(shù)以外的線程的生存時間,例如corePoolSize為2,maximumPoolSize為5,假如我們線程池中有5個線程,核心線程以外有3個,這3個線程如果在keepAliveTime的時間內沒有被用到就會被回收。
- unit:keepAliveTime的時間單位。
- workQueue:當核心線程都在忙碌時,任務都會先放到隊列中。
- threadFactory:線程工廠,用戶可以通過這個參數(shù)指定創(chuàng)建線程的線程工廠。
- handler:當線程池無法接受新的任務時,就會根據(jù)這個參數(shù)做出拒絕策略,默認拒絕策略是直接拋異常。
對應的構造方法如下所示:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//......
}
4. 線程池的工作流程
從ThreadPoolExecutor的execute方法我們大體可以窺探到其內部核心邏輯:
- 如果工作的線程小于核心線程數(shù),則調用addWorker創(chuàng)建線程并執(zhí)行我們傳入的任務。
- 如果核心線程都在工作,則調用workQueue.offer(command)將我們提交的任務放到隊列中。
- 如果隊列也無法容納任務時,則繼續(xù)創(chuàng)建線程并用這些線程處理新進來的任務。
- 如果還有新的任務接入且當線程數(shù)達到maximumPoolSize時,說明已經無法容納任務了,則調用reject(command)按照拒絕策略處理任務。
對應的我們給出execute的源碼核心邏輯,讀者可自行參閱:
public void execute(Runnable command) {
//......
//工作線程數(shù)小于核心線程數(shù),則創(chuàng)建線程線程處理傳入的任務
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//核心線程都在工作則將任務存入阻塞隊列
//......
}
elseif (!addWorker(command, false))//隊列無法容納則繼續(xù)創(chuàng)建線程應急處理,如果創(chuàng)建失敗說明當前線程超過maximumPoolSize,則調用reject按照拒絕策略處理任務
reject(command);
}
5. 線程池的幾種狀態(tài)
ThreadPoolExecutor通過2的次冪并結合位運算標識了幾種線程的狀態(tài),這種做法也是計算機程序設計中常見的實用技巧,即通過位運算標識替代常規(guī)數(shù)值比對,保證狀態(tài)唯一性的同時還能保證程序的執(zhí)行效率,對應的線程池狀態(tài)和注釋如下:
//RUNNING 說明線程正處于運行狀態(tài),正在處理任務和接受新的任務進來
privatestaticfinalint RUNNING = -1 << COUNT_BITS;
//說明線程收到關閉的通知了,繼續(xù)處理手頭任務,但不接受新任務
privatestaticfinalint SHUTDOWN = 0 << COUNT_BITS;
//STOP說明線程停止了不處理任務也不接受任務,即時隊列中有任務,我們也會將其打斷。
privatestaticfinalint STOP = 1 << COUNT_BITS;
//表明所有任務都已經停止,記錄的任務數(shù)量為0
privatestaticfinalint TIDYING = 2 << COUNT_BITS;
//線程池完全停止了
privatestaticfinalint TERMINATED = 3 << COUNT_BITS;
6. 線程池的幾種拒絕策略
- AbortPolicy:這個拒絕策略在無法容納新任務的時候直接拋出異常,這種策略是線程池默認的拒絕策略。
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
- CallerRunsPolicy:從源碼中可以看出,當線程池無法容納新任務的時,會直接將當前任務交給調用者執(zhí)行。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//......
//讓當前提交任務的線程運行
if (!e.isShutdown()) {
r.run();
}
}
}
- DiscardOldestPolicy:顧名思義,當線程池無法最新任務時,會將隊首的任務丟棄,將新任務存入。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
//將隊首元素poll掉,并將當前任務提交
e.getQueue().poll();
e.execute(r);
}
}
}
DiscardPolicy:從源碼中可以看出這個策略什么也不做,相當于直接將當前任務丟棄。
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
//什么都不做直接即丟棄當前任務
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
6. 線程兩種任務提交方式
首先是execute,當任務提交到線程池中時直接按照流程執(zhí)行即可,處理完成后是沒有返回值的源碼上文已經給出這里就不多贅述。而submit它會將傳進來的任務封裝成RunnableFuture,然后將Future返回出去,調用者可以通過get方法獲取返回結果:
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
//提交任務
execute(ftask);
//返回Future,后續(xù)我們可以通過get獲取結果
return ftask;
}
對應的我們也給出使用示例:
@Test
void baseUse() throws ExecutionException, InterruptedException {
//創(chuàng)建含有3個線程的線程池
ExecutorService threadPool = Executors.newFixedThreadPool(3);
//提交3個任務到線程池中
for (int i = 0; i < 3; i++) {
finalint taskNo = i;
Future<Integer> future = threadPool.submit(() -> {
logger.info("執(zhí)行任務{}", taskNo);
return1;
});
logger.info("處理結果:{}", future.get());
}
//關閉線程池
threadPool.shutdown();
//如果線程池還沒達到Terminated狀態(tài),說明線程池中還有任務沒有執(zhí)行完,則繼續(xù)循環(huán)等待線程池執(zhí)行完任務
while (!threadPool.isTerminated()) {
}
}
輸出結果:
00:24:41.204 [pool-1-thread-1] INFO com.example.javacommonmistakes100.JavaCommonMistakes100ApplicationTests - 執(zhí)行任務0
00:24:41.208 [main] INFO com.example.javacommonmistakes100.JavaCommonMistakes100ApplicationTests - 處理結果:1
00:24:41.209 [pool-1-thread-2] INFO com.example.javacommonmistakes100.JavaCommonMistakes100ApplicationTests - 執(zhí)行任務1
00:24:41.209 [main] INFO com.example.javacommonmistakes100.JavaCommonMistakes100ApplicationTests - 處理結果:1
00:24:41.209 [pool-1-thread-3] INFO com.example.javacommonmistakes100.JavaCommonMistakes100ApplicationTests - 執(zhí)行任務2
00:24:41.209 [main] INFO com.example.javacommonmistakes100.JavaCommonMistakes100ApplicationTests - 處理結果:1
7. 線程池的關閉方式
線程池的停止方式有兩種:
- shutdown:筆者上述代碼示例用的都是這種方式,使用這個方法之后,我們無法提交新的任務進來,線程池會繼續(xù)工作,將手頭的任務執(zhí)行完再停止:
public void shutdown() {
//上鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//檢查并設置狀態(tài),不再接受新任務
checkShutdownAccess();
advanceRunState(SHUTDOWN);
//打斷空閑的線程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//停止線程池
tryTerminate();
}
- shutdownNow:這種停止方式就比較粗暴了,線程池會直接將手頭的任務都強行停止,且不接受新任務進來,線程停止立即生效:
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
//上鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//設置狀態(tài)為stop強行停止
checkShutdownAccess();
advanceRunState(STOP);
//打斷空閑線程
interruptWorkers();
//移除隊列中的任務
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//停止線程池
tryTerminate();
return tasks;
}
8. 非核心線程創(chuàng)建與調度饑餓問題
了解了線程池整體工作原理后,讀者是否想過,為什么先要用corePoolSize核心線程,然后當核心線程處理不過來時將異步任務先放到workQueue中,而不是直接開maximumPoolSize的線程數(shù)繼續(xù)處理應急任務呢?如果按照當前的線程池執(zhí)行流程不就存在一個饑餓問題?即后來的任務可能會比存在于隊列中的任務先執(zhí)行:
其實回答這個問題,其實我們可以通過反證法來解釋,假設任務處理不過來之后,直接創(chuàng)建maximumPoolSize個線程處理任務,那么就會存在以下幾個問題:
- 僅僅因為核心線程數(shù)處理不來任務就認為是應急情況,這會導致應急線程被提前創(chuàng)建,這就可能存在頻繁創(chuàng)建和銷毀線程的性能損耗,例如核心線程為4,剛剛好某個時間段剛剛好來個5個異步任務,僅僅因為多了一個任務,在沒有任何緩沖的情況下,直接創(chuàng)建應急線程然后被銷毀,這就會導致這種不合理的性能損耗。
- 資源消耗:創(chuàng)建完最大線程之后,線程有可能處于空閑中,這也不能意味著線程沒有任何開銷,一旦線程被啟動對于CPU、內存而言都是存在一定的資源開銷的,如果maximumPoolSize線程數(shù)過大,對于系統(tǒng)資源占用也是非常不劃算的。
總的來說,設計者們認為只有緩沖區(qū)處理不來(隊列容納不下)的情況下才能開啟應急線程是一種對于應急情況的判斷依據(jù),由此避免了非應急情況創(chuàng)建應急線程的開銷:
//存入阻塞隊列失敗后,才會嘗試調用addWorker開啟非核心線程,即通過阻塞隊列的閾值來作為應急情況判斷的依據(jù)
if (isRunning(c) && workQueue.offer(command)) {
//......
}
else if (!addWorker(command, false))//如果非核心線程開啟失敗,則執(zhí)行拒絕策略
reject(command);
再來回答另外一個問題即任務饑餓問題,,這確實會存在一定情況下存在饑餓問題,但筆者認為該問題只要線程池參數(shù)設定得當,在非核心線程啟動之后,這些堆積在阻塞隊列的任務在一定時間后就會被任意抽身出來的線程從隊列中取出并處理,對應的我們給出每一個worker線程的執(zhí)行邏輯即ThreadPoolExecutor的runWorker方法:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//......
try {
//從阻塞隊列中獲取任務task
while (task != null || (task = getTask()) != null) {
//......
try {
//......
try {
//執(zhí)行任務
task.run();
} catch (RuntimeException x) {
//......
} finally {
afterExecute(task, thrown);
}
} finally {
//......
}
}
//......
} finally {
processWorkerExit(w, completedAbruptly);
}
}
當然如果讀者對于任務公平有著嚴格要求同時系統(tǒng)資源也足夠充分,完全可以考慮通過Executors.newSingleThreadExecutor()這種只有一個線程的線程輪詢處理阻塞隊列的任務模式,來保證異步任務順序性和公平性:
對應的我們也給出singleThreadExecutor的核心實現(xiàn),讀者也可以參考源碼了解一下:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
二、線程池使用注意事項
1. 避免使用Executors的newFixedThreadPool
接下來我們來看看日常使用線程池時一些錯誤示例,為了更好的看到線程池的變化,我們編寫這樣一個定時任務去監(jiān)控線程池的變化。
/**
* 打印線程池情況
*
* @param threadPool
*/
private void printStats(ThreadPoolExecutor threadPool) {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
log.info("=========================");
log.info("Pool Size:{}", threadPool.getPoolSize());
log.info("Active Threads:{}", threadPool.getActiveCount());
log.info("Number of Tasks Completed: {}", threadPool.getCompletedTaskCount());
log.info("Number of Tasks in Queue:{}", threadPool.getQueue().size());
log.info("=========================");
}, 0, 1, TimeUnit.SECONDS);
}
先來看看這樣一段代碼,我們循環(huán)1e次,每次創(chuàng)建這樣一個任務:生成一串大字符串,休眠一小時后打印輸出。
@GetMapping("oom1")
public void oom1() {
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
printStats(threadPool);
for (int i = 0; i < 1_0000_0000; i++) {
threadPool.submit(() -> {
String payload = IntStream.rangeClosed(1, 100_0000)
.mapToObj(__ -> "a")
.collect(Collectors.joining("")) + UUID.randomUUID().toString();
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(payload);
});
}
threadPool.shutdown();
while (!threadPool.isTerminated()) {
}
}
項目啟動后使用jvisualvm監(jiān)控項目的變化:
可以看到此時CPU使用情況,堆區(qū)、還有線程數(shù)使用情況都是正常的。
然后我們對剛剛的接口發(fā)起請求:
curl http://localhost:8080/threadpooloom/oom1
我們先來看看控制臺輸出,可以看到線程數(shù)沒有增加,而隊列的任務卻不斷累積。
看看jvisualvm,此時堆區(qū)內存不斷增加,盡管發(fā)生了幾次GC,還是沒有回收到足夠的空間。最終引發(fā)OOM問題。
我們通過源碼來觀察一下newFixedThreadPool的特征,可以看到它的核心線程數(shù)和最大線程數(shù)都是傳進來的值,這意味著無論多少個任務進來,線程數(shù)都是nThreads。如果我們沒有足夠的線程去執(zhí)行的任務的話,任務就會堆到LinkedBlockingQueue中,從源碼中我們也能看出,LinkedBlockingQueue是無界隊列(底層是通過鏈表實現(xiàn)的):
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
2. 避免使用Executors的newCachedThreadPool
再來看看第二段代碼,同樣的任務提交到newCachedThreadPool中,我們看看會發(fā)生什么。
@GetMapping("oom2")
public void oom2() {
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newCachedThreadPool();
printStats(threadPool);
for (int i = 0; i < 1_0000_0000; i++) {
threadPool.submit(() -> {
String payload = IntStream.rangeClosed(1, 100_0000)
.mapToObj(__ -> "b")
.collect(Collectors.joining("")) + UUID.randomUUID().toString();
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(payload);
});
}
threadPool.shutdown();
while (!threadPool.isTerminated()) {
}
}
先來看看控制臺,可以看到線程數(shù)正在不斷的飆升。
從jvisualvm也能看出堆區(qū)和線程數(shù)也在不斷飆升,最終導致OOM。
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (malloc) failed to allocate 32744 bytes for ChunkPool::allocate
# An error report file with more information is saved as:
# F:\github\java-common-mistakes-100\hs_err_pid147400.log
我們來看看newCachedThreadPool源碼,可以看到這個線程池核心線程數(shù)初始為0,最大線程數(shù)為Integer.MAX_VALUE,而隊列使用的是SynchronousQueue,所以這個隊列等于不會存儲任何任務。
這就意味著我們每次提交一個任務沒有線程處理的話,線程池就會創(chuàng)建一個新的線程去處理這個任務,1s內沒有線程使用就將其銷毀。
我們的連續(xù)1e次循環(huán)提交任務就會導致創(chuàng)建1e個線程,最終導致線程數(shù)飆升,進而引發(fā)OOM問題。
public static ExecutorService newCachedThreadPool() {
//隊列不可容納元素,最大線程數(shù)設置為Integer.MAX_VALUE
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
3. 確保你創(chuàng)建線程池的方式線程可以被復用
我們監(jiān)控發(fā)現(xiàn)某段時間線程會不斷飆升,然后急速下降,然后急速上升:
然后我們在線程的棧幀中看到SynchronousQueue,大概率有人使用newCachedThreadPool。
最終通過全局搜索看到這樣一段代碼,可以看到這個工具類每次請求就會創(chuàng)建一個newCachedThreadPool給用戶使用。
static class ThreadPoolHelper {
public static ThreadPoolExecutor getThreadPool() {
return (ThreadPoolExecutor) Executors.newCachedThreadPool();
}
}
我們在定位到調用出,真想明了了,原來每一次請求都會創(chuàng)建一個newCachedThreadPool處理大量的任務,由于newCachedThreadPool回收時間為1s,所以線程使用完之后立刻就被回收了。
@GetMapping("wrong")
public String wrong() {
ThreadPoolExecutor threadPool = ThreadPoolHelper.getThreadPool();
IntStream.rangeClosed(1, 20).forEach(i -> {
threadPool.execute(() -> {
String payload = IntStream.rangeClosed(1, 1000000)
.mapToObj(__ -> "a")
.collect(Collectors.joining("")) + UUID.randomUUID().toString();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
log.debug(payload);
});
});
return"ok";
}
解決方式也很簡單,我們按需調整線程池參數(shù),將線程池作為靜態(tài)變量全局復用即可。
static class ThreadPoolHelper {
privatestatic ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10,
50,
2,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000),
new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").get());
public static ThreadPoolExecutor getRightThreadPool() {
return threadPool;
}
}
從監(jiān)控來看線程數(shù)正常多了。
4. 仔細斟酌線程混用策略
我們使用線程池來處理一些異步任務,每個任務耗時10ms左右。
@GetMapping("wrong")
public int wrong() throws ExecutionException, InterruptedException {
return threadPool.submit(calcTask()).get();
}
private Callable<Integer> calcTask() {
return () -> {
log.info("執(zhí)行異步任務");
TimeUnit.MILLISECONDS.sleep(10);
return1;
};
}
壓測的時候發(fā)現(xiàn)性能很差,處理時間最長要283ms。
步入線程池發(fā)現(xiàn),線程池的配置如下,只有2個線程和50個隊列。
private ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,
2,
1,
TimeUnit.HOURS,
new ArrayBlockingQueue<>(50),
new ThreadFactoryBuilder().setNameFormat("batchfileprocess-threadpool-%d").get(),
new ThreadPoolExecutor.CallerRunsPolicy());
查看調用發(fā)現(xiàn),原來后臺有一個處理字符串并將內容寫入到文本文件的操作,綜合來看相當于一個計算型任務,由于這個任務不是很經常出現(xiàn),所以開發(fā)者就設置兩個線程,并且為了讓任務能夠正確完成,拒絕策略也是使用CallerRunsPolicy,讓多出來的任務用調用者線程來執(zhí)行。
@PostConstruct
public void init() {
printStats(threadPool);
new Thread(() -> {
String payload = IntStream.rangeClosed(1, 100_0000)
.mapToObj(__ -> "a")
.collect(Collectors.joining(""));
while (true) {
threadPool.execute(() -> {
try {
Files.write(Paths.get("demo.txt"), Collections.singletonList(LocalTime.now().toString() + ":" + payload), UTF_8, CREATE, TRUNCATE_EXISTING);
} catch (IOException e) {
e.printStackTrace();
}
// log.info("batch file processing done");
});
}
}, "T1").start();
}
解決方式也很簡單,上述線程池并不是為我們這種IO密集型任務準備的,所以我們單獨為其劃分一個線程池出來處理這些任務。
private ThreadPoolExecutor asyncCalcThreadPool = new ThreadPoolExecutor(200,
200,
1,
TimeUnit.HOURS,
new ArrayBlockingQueue<>(50),
new ThreadFactoryBuilder().setNameFormat("asynccalc-threadpool-%d").get(),
new ThreadPoolExecutor.CallerRunsPolicy());
@GetMapping("wrong")
public int wrong() throws ExecutionException, InterruptedException {
return asyncCalcThreadPool.submit(calcTask()).get();
}
經過壓測可以發(fā)現(xiàn)性能明顯上來了:
5. 使用正確的方式提交任務
假如我們提交給線程池的任務沒有返回值,我們建議使用execute。
這一點我們不妨看一下這樣一段代碼,該代碼會循環(huán)提交10個算術異常的任務給線程池。可以看到我們提交的任務是沒有返回值的,而我們提交任務時卻用到了submit。使用submit提交任務時,會返回一個Future對象,通過Future對象我們可以使用get方法阻塞獲取任務返回結果。
因為我們的任務是沒有返回值的,所以我們提交過程中并沒有通過get方法獲取返回結果,這就導致了一個隱患——吞異常。
private static ExecutorService threadPool = Executors.newFixedThreadPool(1);
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
//提交一個算術異常的任務
threadPool.submit(() -> {
log.info("開始執(zhí)行運算");
int r = 1 / 0;
log.info("結束執(zhí)行運算");
});
}
//等待線程池關閉
threadPool.shutdown();
while (!threadPool.isTerminated()) {
}
}
可以看到這段代碼的輸出結果如下,控制臺僅僅輸出線程開始工作,卻沒有輸出結果。
09:15:36.940 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運算
09:15:36.942 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運算
09:15:36.942 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運算
09:15:36.942 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運算
09:15:36.942 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運算
09:15:36.942 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運算
09:15:36.942 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運算
09:15:36.942 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運算
09:15:36.942 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運算
09:15:36.942 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運算
這一點,我們通過查看FutureTask的run源碼可以得知,F(xiàn)utureTask的run方法執(zhí)行步驟如下:
- 調用call方法,執(zhí)行任務。
- 得到result后將ran設置為true。
- 如果執(zhí)行過程中報錯,直接進入catch模塊,將result設置為null,并將ran設置為false。
- 調用setException處理異常。
try { //執(zhí)行任務,返回一個結果賦值給result
result = c.call();
ran = true;
} catch (Throwable ex) {
//任務拋出異常后,將result設置為null,ran狀態(tài)設置為false,并調用setException處理異常
result = null;
ran = false;
setException(ex);
}
步入代碼查看setException我們可以發(fā)現(xiàn),它會將異常結果賦值給outcome然后調用finishCompletion結束任務,所以如果我們沒有主動獲取任務結果,那么這個錯誤就永遠不會被感知。
protected void setException(Throwable t) {
//通過cas將結果設置為完成(COMPLETING)值為1
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//將異常賦值給outcome直接將任務結束
outcome = t;
//通過cas將結果設置為異常(EXCEPTIONAL)值為3
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
針對上述問題,要想獲取異常也很簡單,主動調用get獲取結果即可:
private static ExecutorService threadPool = Executors.newFixedThreadPool(1);
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
//提交一個算術異常的任務
Future<?> future = threadPool.submit(() -> {
System.out.println(Thread.currentThread().getName() + " do working");
int r = 1 / 0;
System.out.println(r);
});
try {
//通過get阻塞獲取任務結果
Object o = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
threadPool.shutdown();
while (!threadPool.isTerminated()){
}
}
從輸出結果可以看到出現(xiàn)異常后,錯誤直接拋出,我們就可以及時調試處理了。
pool-1-thread-1do working
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
pool-1-thread-1do working
pool-1-thread-1do working
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
pool-1-thread-1do working
pool-1-thread-1do working
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
pool-1-thread-1do working
at com.sharkChili.threadpool.Main.main(Main.java:23)
Caused by: java.lang.ArithmeticException: / by zero
at com.sharkChili.threadpool.Main.lambda$main$0(Main.java:17)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
為什么調用get才能捕獲到異常呢?通過查看get源碼可以了解到get方法的執(zhí)行步驟:
- 獲取任務執(zhí)行狀態(tài)state。
- 如果state小于COMPLETING(COMPLETING值為1)說明任務未完成,則調用awaitDone等待任務完成。
- 如果大于1則說明任務已完成(),通過上文源碼可知我們的任務已經被CAS設置為EXCEPTIONAL(值為3),所以直接調用report。
public V get() throws InterruptedException, ExecutionException {
int s = state;
//如果s小于1則說命未完成,調用awaitDone等待完成,在調用report
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
查看report代碼我們終于知道原因了,我們任務執(zhí)行報錯所以s的值為3,小于CANCELLED,所以調用了最后一段代碼將異常拋出了。
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
通過上述我們知道的submit使用不當可能存在吞異常的情況以及應對辦法,實際上對于沒有返回值的任務,我們建議直接使用execute,execute感知異常時會直接將任務拋出:
private static ExecutorService threadPool = Executors.newFixedThreadPool(1);
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
//提交一個算術異常的任務
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " do working");
int r = 1 / 0;
System.out.println(r);
});
}
threadPool.shutdown();
while (!threadPool.isTerminated()) {
}
}
從輸出結果來看,算術異常直接拋出,被主線程感知了。
pool-1-thread-1do working
Exception in thread "pool-1-thread-1" Exception in thread "pool-1-thread-2" java.lang.ArithmeticException: / by zero
pool-1-thread-2do working
pool-1-thread-3do working
at com.sharkChili.threadpool.Main.lambda$main$0(Main.java:17)
pool-1-thread-4do working
pool-1-thread-5do working
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
pool-1-thread-6do working
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
pool-1-thread-7do working
通過查看sumbit執(zhí)行源碼,我們可以看到代碼調用棧會來到ThreadPoolExecutor的runWorker下面這個代碼段的邏輯:
- 調用run執(zhí)行任務。
- afterExecute收尾任務。
- 如果感知異常則拋出異常throw x。
- 所以我們的任務會因為算術異常而拋出任務。
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//執(zhí)行算數(shù)任務
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; thrownew Error(x);
} finally {
afterExecute(task, thrown);
}
最終代碼被JVM感知直接將異常拋到控制臺,所以對于沒有返回值的任務,我們建議使用execute執(zhí)行任務。
private void dispatchUncaughtException(Throwable e) {
getUncaughtExceptionHandler().uncaughtException(this, e);
}
6. 避免任務頻繁拋出異常
上文提到使用execute提交無返回值的任務,這樣異常就會被感知,但還需要注意的是頻繁的拋出異常會讓線程消亡,導致線程池每次執(zhí)行新任務時回去創(chuàng)建新的線程。
還是以這段代碼為例,我們對于算術異常沒有任務處理。
private static ExecutorService threadPool = Executors.newFixedThreadPool(1);
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
//提交一個算術異常的任務
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " do working");
int r = 1 / 0;
System.out.println(r);
});
}
threadPool.shutdown();
while (!threadPool.isTerminated()) {
}
}
可以看到我們明明只有一個線程的線程池,每次拋出異常后,都會創(chuàng)建一個新的線程處理任務。
pool-1-thread-1 do working
pool-1-thread-2 do working
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
at com.sharkChili.threadpool.Main.lambda$main$0(Main.java:17)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Exception in thread "pool-1-thread-2" pool-1-thread-3 do working
這一點我們從源碼中可知,拋出的異常被JVM感知并調用dispatchUncaughtException方法,該方法會通過getUncaughtExceptionHandler得到線程組,然后調用uncaughtException處理異常。
private void dispatchUncaughtException(Throwable e) {
getUncaughtExceptionHandler().uncaughtException(this, e);
}
最終代碼會走到e.printStackTrace打印異常堆棧信息并終止任務,銷毀線程。
public void uncaughtException(Thread t, Throwable e) {
if (parent != null) {
parent.uncaughtException(t, e);
} else {
Thread.UncaughtExceptionHandler ueh =
Thread.getDefaultUncaughtExceptionHandler();
if (ueh != null) {
ueh.uncaughtException(t, e);
} elseif (!(e instanceof ThreadDeath)) {
System.err.print("Exception in thread \""
+ t.getName() + "\" ");
e.printStackTrace(System.err);
}
}
}
所以我們建議,對于線程池的中的任務盡可能不要用異常來處理邏輯,對于可以預見的異常,我們建議手動處理返回,避免線程銷毀再創(chuàng)建的開銷。
以我們的算術異常為例,我們可以提前判斷一下被處數(shù)提前用業(yè)務手段處理掉異常。
private static ExecutorService threadPool = Executors.newFixedThreadPool(1);
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
//提交一個算術異常的任務
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " do working");
//手動處理業(yè)務代碼的異常
int num= RandomUtil.randomInt(0,10);
if (num==0){
System.out.println("The dividend cannot be zero. ");
return;
}
int r = 1 / num;
System.out.println(r);
});
}
threadPool.shutdown();
while (!threadPool.isTerminated()) {
}
}
三、小結
總結一下上述線程池的使用經驗:
- 避免使用Executors創(chuàng)建線程池。
- 確保線程確實被服用到。
- 使用合適的方式提交任務以及及時處理任務中的異常。
- 確保在合適的場景使用合適的線程池:
CPU密集型:若是CPU密集型,我們希望多利用CPU資源來處理任務,因為沒有任何IO,理想情況線程數(shù)=CPU核心數(shù)即可,但是考慮到可能回出現(xiàn)某個意外情況導致線程阻塞,所以我們建議線程數(shù)=CPU核心數(shù)+1
IO密集型:IO密集型由于每個任務可能回出現(xiàn)IO導致任務阻塞,在單核情況下,我們建議:
線程數(shù)=IO時長/CPU計算耗時+1
若在多核的情況下,我們建議
線程數(shù)=CPU核心數(shù) * (IO時長/CPU計算耗時+1)
但是具體情況還要具體結合壓測結果進行響應調整。