同步工具類

2022-07-04 00:30:12 字數 4659 閱讀 8511

同步工具類可以是任何乙個物件,只要它根據其自身的狀態來協調執行緒控制流。阻塞佇列(blockingqueue)可以作為同步工具類,其他型別的同步工具類還包括訊號量(semaphore),柵欄(barrier)以及閉鎖(latch)。在平台類庫中還包含其他一些同步工具類的類,如果這些類還無法滿足需要,那麼可以建立自己的同步工具類。

閉鎖可以延遲線程的進度直到其到達終止狀態。閉鎖的作用相當於一扇門:在閉鎖到達結束狀態之前,這扇門一直是關著的,並且沒有任何執行緒能通過,當到達結束狀態時,這扇門會開啟並允許所有的執行緒通過。當閉鎖到達結束狀態後,將不會再改變狀態,因此這扇門將永遠保持開啟狀態。閉鎖可以用來確保某些活動直到其他活動都完成後才繼續執行,例如:

countdownlatch是一種靈活的閉鎖實現,可以在上述各種情況下使用,它可以使乙個或多個執行緒等待一組事件發生。閉鎖狀態包括乙個計數器,該計數器被初始化為乙個正數,表示需要等待的事件數量。countdown方法將遞減計數器,表示有乙個事件已經發生了,而await方法等待計數器到達零,這表示所有需要的事件都已經發生。如果計數器的值為非零,那麼await會一直阻塞直到計數器為零,或者等待中的執行緒中斷,或者等待超時。

public classtestharness finally 

} catch(interruptedexception ignored) {}}};

t.start();

} long start = system.nanotime();

startgate.countdown();

long end = system.nanotime();

return end - start; }}

futuretask也可以用作閉鎖。(futuretask實現了future語義,表示一種抽象的可生成結果的計算)。futuretask表示的計算是通過callable來實現的,相當於一種可生成結果的runnable,並且可以處於以下3中狀態:等待執行(waiting to run),正在執行(running)和完成執行(completed)。「執行完成」表示計算的所有可能結束方式,包括正常結束、由於取消而結束和由於異常而結束等。當futuretask進入完成狀態以後,它會永遠停止在這個狀態上。

future.get的行為取決於任務的狀態。如果任務已經執行完成,那麼get會立即返回結果,否則get將阻塞知道任務進入完成狀態,然後返回結果或者丟擲異常。futuretask將計算結果從執行計算的執行緒傳遞到獲取這個結果的執行緒,而futuretask的規範確保了這種傳遞過程能實現結果的安全發布。

public classpreloader

});private final thread thread = new thread(future);

publicvoidstart()

publicproductinfo get()throwsdataloadexception, interruptedexception catch(executionexception e) else

} }}

preloader建立了乙個futuretask,其中包含從資料庫載入產品資訊的任務,以及乙個執行運算的執行緒。由於在建構函式或靜態初始化方法中啟動執行緒並不是一種好方法,因此提供了乙個start方法來啟動執行緒。當程式雖有需要productinfo時,可以呼叫get方法,如果資料已經載入,那麼將返回這些資料,否則將等待載入完成後再返回。

callable表示的任務可以丟擲受檢查的或未受檢查的異常,並且任何**都可能丟擲乙個error。無論任務**丟擲什麼異常,都會被封裝到乙個executionexception中,並在future.get中被重新丟擲。這將使呼叫get的**變得複雜,因為它不僅需要處理可能出現的executionexception(以及未檢查的cancellationexception),而且還由於executionexception是作為乙個throwable類返回的,因此處理起來並不容易。

計數訊號量(counting semaphore)用來控制同時訪問某個特定資源的運算元量,或者同時執行某個指定操作的數量。計數訊號量還可以用來實現某種資源池,或者對容器施加邊界

semaphore中管理著一組虛擬的許可(permit),許可的初始數量可通過建構函式來指定。在執行操作時可以首先獲得許可(只要還有剩餘的許可),並在使用以後釋放即可。如果沒有許可,那麼aquire將阻塞直到有許可(或者直到被中斷或者操作超時)。release方法將返回乙個許可給訊號量。計算訊號量的一種簡化形式是二值訊號量,即初始值為1的semaphore。二值訊號量可以用作互斥體(mutex),並具備不可重入的加鎖語義:誰擁有這個唯一的許可,誰就擁有了互斥鎖。

semaphore可以用於實現資源池,例如資料庫連線池。我們可以構造乙個固定長度的資源池,當池為空時,請求資源將會失敗,但你真正希望看到的行為是阻塞而不是失敗,並且當池非空時解除阻塞。如果將semaphore的計數值初始化為池的大小,並在從池中獲取乙個資源之前先呼叫aquire方法獲得乙個許可,在將資源返回給池之後呼叫release釋放許可,這樣就實現了固定長度了。

同樣,你可以使用semaphore將任何一種容器變成有界阻塞容器,如下**所示。訊號量的計數值會初始化為容器容量的最大值。add操作在向底層容器中新增乙個元素之前,首先需要獲得乙個許可。如果add操作沒有新增任何元素,那麼會立刻釋放許可。同樣remove操作釋放乙個許可,使更多的元素能夠新增到容器中。底層的set實現並不知道關於邊界的任何資訊,這是由boundedhashset來處理的。

public classboundedhashset 

publicbooleanadd(t o)throwsinterruptedexception finally

} }publicbooleanremove(object o)

return wasremoved; }}

我們已經看到通過閉鎖來啟動一組相關的操作,或者等待也組相關的操作結束。閉鎖是一次性物件,一旦進入終止狀態,就不能被重置。

柵欄(barrier)類似於閉鎖,它能阻塞執行緒直到某個事件發生。柵欄與閉鎖的關鍵區別在於:所有執行緒必須同時到達柵字段置,才能繼續執行。閉鎖也用於等待事件,而柵欄也用於等待其他執行緒。柵欄用於實現一些協議,例如幾個家庭決定在某個地方集合:「所有人6:00在麥當勞碰頭,到了以後要等其他人,之後再討論下一步要做的事情。」

cyclicbarrier可以使一定數量的參與方反覆地在柵字段置匯集,它在並行迭代演算法中非常有用:這種演算法通常將也乙個問題拆分成一系列相互獨立的子問題。當執行緒到達柵字段置時,將呼叫await方法,這個方法將阻塞直到所有執行緒都到達阻塞位置。如果所有執行緒都到達了柵欄的位置,那麼柵欄將開啟,此時所有執行緒都將被釋放,而柵欄將被重置以便下次使用。如果對await的呼叫超時,或者await阻塞的執行緒被中斷,那麼柵欄就被認為是打破了,所有阻塞的await呼叫都將終止並丟擲brokenbarrierexception。如果成功地通過柵欄,那麼await將為每個執行緒返回乙個唯一的到達索引號,我們可以利用這些索引來「選舉」產生乙個領導執行緒,並在下一次迭代中由該領導執行緒執行一些特殊的工作。cyclicbarrier還可以使你將乙個柵欄引數傳遞給建構函式,這是乙個runnable,當成功通過柵欄時會(在下乙個子任務執行緒中)執行它,但在阻塞執行緒被釋放之前是不能執行的。

下面的例子中給出了如何通過柵欄來計算細胞的自動化模擬。在把模擬過程並行化時,為每個元素(這裡為每個細胞)分配乙個獨立的執行緒是不現實的,也因為這將產生過多的執行緒,而在協調這些執行緒上導致的開銷將降低計算效能。合理的做法是,將問題分解成一定數量的子問題,為每個子問題分配乙個執行緒來進行求解,之後再將所有的結果合併起來。cellularautomata將問題分解為ncpu(可用的cpu數量)個子問題,並將每個子問題分配給乙個執行緒。在每個步驟中,工作執行緒都為各自子問題中的所有細胞計算新值。當所有工作執行緒都到達柵欄時,柵欄會吧這些新值交給資料模型。在柵欄的操作執行完成以後,工作執行緒將開始下一步的計算,包括呼叫isdone方法來判斷是否需要進行下一次迭代。

public classcellularautomata

});this.workers = new worker[count];

for (int i = 0; i < count; i++) }

private classworkerimplementsrunnable

publicvoidrun()

}try catch(interruptedexception ex) catch(brokenbarrierexception ex)

}} }

publicvoidstart()

mainboard.waitforconvergence(); }}

另一種形式的柵欄就是exchanger,它是一種兩方(two-party)柵欄,各方在柵字段置上交換資料。當兩方執行不對稱的操作時,exchanger會非常有用,例如當也乙個執行緒向緩衝區寫入資料,而另乙個執行緒從緩衝區中讀取資料。這些執行緒可以使用exchanger來匯合,並將滿的緩衝區與空的緩衝區交換。當兩個執行緒通過exchanger交換物件時,這種交換就把這兩個物件安全地發布給另一方。

資料交換的時機取決於應用程式的響應需求。最簡單的方案是,當緩衝區被填滿時,由填充任務進行交換,當緩衝區為空時,由清空任務進行交換。這樣會把需要交換的次數降至最低,但如果新資料的到達率不可**,那麼一些資料的處理過程就將也延遲。另乙個方法是,不僅當緩衝被填滿時進行交換,並且當緩衝被填充到一定程度並保持也一定時間也以後,也進行交換。

同步工具類

同步工具類可以是任何乙個物件,只要它根據其自身的狀態來協調執行緒的控制流。儲存物件的容器,還能協調生產者和消費者等執行緒之間的控制流 take和put等方法將阻塞,直到佇列達到期望的狀態 佇列即非空,也非滿 相當於一扇門 在閉鎖到達結束狀態之前,這扇門一直是關閉的,並且沒有任何執行緒能通過,當到達結...

同步工具類 CountDownLatch

countdownlatch是乙個同步工具類,它允許乙個或多個執行緒一直等待,直到其他執行緒執行完後再執行。例如,應用程式的主線程希望在負責啟動框架服務的執行緒已經啟動所有框架服務之後執行。countdownlatch是通過乙個計數器來實現的,計數器的初始化值為執行緒的數量。每當乙個執行緒完成了自己...

同步工具類之 Latch

同步工具類除了最熟悉的阻塞佇列之外,還包括semaphore barrier以及latch。同樣,我們也可以建立屬於自己的同步工具類。所有的同步工具類都包含了一些特定的結構屬性 比如,封裝了一些狀態,而這些狀態將決定執行同步工具類的執行緒是繼續執行海蛇等待,除此而外,還提供了一些方法對狀態進行操作,...