執行緒池底層佇列詳解

2022-09-16 00:06:28 字數 3789 閱讀 9432

如果執行的執行緒數 < corepoolsize,則 executor始終首選新增新的執行緒,而不進行排隊。即任務根本不會存入queue中,而是直接執行

如果執行的執行緒數 >= corepoolsize,則 executor 始終首選將請求加入佇列,而不新增新的執行緒。 

如果無法將請求加入佇列,則建立新的執行緒,除非建立此執行緒超出 maximumpoolsize,在這種情況下,任務將被拒絕。

針對 blockingqueue workqueue 這個緩衝佇列,在jdk中,其實已經說得很清楚了,一共有三種型別的queue,分別是:

直接提交 synchronousqueue

無界佇列 linkedblockingqueue

有界佇列 arrayblockingqueue

fixedthreadpool 和 singlethreadexecutor 是使用的無界佇列 linkedblockingqueue

cachedthreadpool 是使用的 直接提交佇列 synchronousqueue

而 scheduledthreadpool 使用的是 delayedworkqueue,這種佇列的內部元素會按照延遲時間的長短對任務進行排序,延時時間越短地就排在佇列的前面,越先被執行,他的內部採用的是「堆」的資料結構。scheduledthreadpool 這種型別的執行緒池就用到該佇列。這種執行緒池要的效果是可以延遲的執行任務,是以時間為單位來決定任務的執行順序的,剛好 delayedworkqueue 佇列就有把任務按時間進行排序的能力,所以一拍即合,這兩種執行緒池就使用 delayedworkqueue 佇列了。

1. 直接提交

2. 無界佇列

3. 有界佇列

當使用有限的 maximumpoolsizes時,有界佇列(如 arrayblockingqueue)有助於防止資源耗盡,但是可能較難調整和控制。

佇列大小 和 最大池大小 可能需要相互折衷:使用大型佇列 和 小型池可以最大限度地降低 cpu 使用率、作業系統資源 和 上下文切換開銷,但是可能導致人工降低吞吐量。

如果任務頻繁阻塞(例如,如果它們是 i/o 邊界),則系統可能為超過許可的更多執行緒安排時間。

使用小型佇列通常要求較大的池大小,cpu使用率較高,但是可能遇到不可接受的排程開銷,這樣也會降低吞吐量。

一、使用直接提交策略,也即synchronousqueue

首先 synchronousqueue 是無界的,也就是說他儲存任務的能力是沒有限制的,但是由於該queue本身的特性,在某次新增元素後必須等待其他執行緒取走後才能繼續新增。在這裡不是核心執行緒便是新建立的執行緒,但是我們試想一樣下,下面的場景。

我們使用一下引數構造 threadpoolexecutor:

new

threadpoolexecutor(

2, 3, 30, timeunit.seconds,

new synchronousqueue(),

new recorderthreadfactory("recorderpooltest"),

newthreadpoolexecutor.callerrunspolicy()

);

當核心執行緒已經有2個正在執行.

此時繼續來了乙個 [任務a],根據前面介紹的 「如果執行的執行緒 >= corepoolsize,則 executor 始終首選將請求加入佇列,而不新增新的執行緒。」,所以 [任務a] 被新增到 queue 中。

又來了乙個 [任務b],且核心2個執行緒還沒有跑完,ok,接下來首先嘗試 1 中描述,但是由於使用的 synchronousqueue,所以一定無法加入進去。

此時便滿足了上面提到的 「如果無法將請求加入佇列,則建立新的執行緒,除非建立此執行緒超出 maximumpoolsize,在這種情況下,任務將被拒絕。」,所以必然會新建乙個執行緒來執行這個任務。

暫時還可以,但是如果這三個任務都還沒完成,連續來了兩個任務,第乙個新增入 queue 中,後乙個呢?queue中無法插入,而執行緒數達到了maximumpoolsize,所以只好執行異常策略了。

所以在使用 synchronousqueue 通常要求 maximumpoolsize 是無界的,這樣就可以避免上述情況發生(如果希望限制就直接使用有界佇列)

對於使用 synchronousqueue 的作用jdk中寫的很清楚:此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。

什麼意思?

如果你的 [任務a1] 和 [任務a2] 有內部關聯,[任務a1] 需要先執行,那麼先提交[任務a1],再提交 [任務a2],當使用 synchronousqueue 我們可以保證,[任務a1] 必定先被執行,在 [任務a1] 沒有被執行前,[任務a2] 不可能新增入 queue 中。

二、使用無界佇列策略,即linkedblockingqueue

如果執行的執行緒數 < corepoolsize,則 executor 始終首選新增新的執行緒,而不進行排隊。那麼當任務繼續增加,會發生什麼呢?

如果執行的執行緒數 >= corepoolsize,則 executor 始終首選將請求加入佇列,而不新增新的執行緒。ok,此時任務變加入佇列之中了,那什麼時候才會新增新執行緒呢?

如果無法將請求加入佇列,則建立新的執行緒,除非建立此執行緒超出 maximumpoolsize,在這種情況下,任務將被拒絕。

這裡就很有意思了,可能會出現無法加入佇列嗎?

不像 synchronousqueue 那樣有其自身的特點,對於無界佇列來說,總是可以加入的(資源耗盡,當然另當別論)。換句說,永遠也不會觸發產生新的執行緒!

corepoolsize 大小的執行緒數會一直執行,跑完當前的,就從佇列中拿任務開始執行。所以要防止任務瘋長,比如任務執行的時間比較長,而新增任務的速度遠遠超過處理任務的時間,而且還不斷增加,不一會兒就爆了。

三、有界佇列,使用arrayblockingqueue

這個是最為複雜的使用,所以jdk不推薦使用也有些道理。與上面的相比,最大的特點便是可以防止資源耗盡的情況發生。

舉例來說,請看如下構造方法:

new

threadpoolexecutor(

2, 3, 30, timeunit.seconds,

new arrayblockingqueue(2),

new recorderthreadfactory("recorderpooltest"),

newthreadpoolexecutor.callerrunspolicy()

);

假設,所有的任務都永遠無法執行完。

對於首先來的 [任務a] 和 [任務b] 來說直接執行,接下來,如果來了 [任務c] 和 [任務d],他們會被放到 queue 中,如果接下來再來 [任務e] 和 [任務f],則增加執行緒執行 [任務e] 和 [任務f]。但是如果再來任務,佇列無法再接受了,執行緒數也到達最大的限制了,所以就會使用拒絕策略來處理。

jdk中的解釋是:當執行緒數 > 核心指定數量時,keepalivetime 為終止前多餘的空閒執行緒等待新任務的最長時間。

有點拗口,其實這個不難理解,在使用了「池」的應用中,大多都有類似的引數需要配置。比如資料庫連線池,dbcp中的maxidle,minidle引數。

什麼意思?

比如:老闆派來的工人是我們苦口婆心「借來的」,俗話說「有借就有還」,但這裡的問題就是什麼時候還了,如果借來的工人剛完成乙個任務就還回去,後來發現任務還有,那豈不是又要去借?這一來一往,老闆肯定頭也大死了。

合理的策略:既然借了,那就多借一會兒。直到「某一段」時間後,發現再也用不到這些工人時,便可以還回去了。這裡的某一段時間便是 keepalivetime 的含義,timeunit 為 keepalivetime 值的度量。

執行緒池底層原理《二》

當執行緒數小於核心執行緒數時,建立執行緒 當執行緒數大於等於核心執行緒數,且任務佇列未滿時,將任務放入任務佇列 當執行緒數大於等於核心執行緒數,且任務佇列已滿時,1 若執行緒數小於最大執行緒數,建立執行緒 2 若執行緒數等於最大執行緒數,丟擲異常,拒絕任務 abortpolicy 丟棄任務,拋執行時...

執行緒池底層工作原理和流程

當提交任務執行excute時 1.若正在執行的執行緒數量小於corepoolsize,則執行緒池馬上建立執行緒執行任務 2.如果正在執行的執行緒數量大於等於corepoolsize,則多餘的任務加入阻塞佇列等待 3.如果佇列滿了且正在執行的執行緒數量小於maximumpoolsize,那麼還要建立非...

執行緒池底層核心介面簡析

threadpoolexcecutor構造方法引數 1.繼承關係 executor executorservice abstractexecutorservice threadpoolexecutor public threadpoolexecutor int corepoolsize,int ma...