併發程式設計之CyclicBarrier詳解

2021-10-12 04:23:14 字數 4062 閱讀 9442

cyclicbarrier(迴圈屏障),它是乙個同步助手工具,它允許多個執行緒在執行完相應的操作之後彼此等待共同到達乙個障點(barrier point)。cyclicbarrier 也非常適合用於某個序列化任務被分拆成若干個並行執行的子任務,當所有的子任務都執行結束之後再繼續接下來的工作。

cyclicbarrier 使一定數量的執行緒反覆地在屏障位置處匯集。當執行緒到達屏障位置時將呼叫 await 方法,這個方法將會阻塞,直到所有執行緒都到達屏障位置,當所有執行緒都到達屏障位置,那麼屏障將開啟,此時所有的執行緒都將被喚醒,而屏障將被重置以便下次使用。

通過  cyclicbarrier **結構我們可以發現其內部使用了 獨佔鎖  reentrantlock 和 condition 來實現執行緒之間的等待通知功能。

cyclicbarrier 一共提供了兩個構造方法,分布如下:

注意:

parties 引數必須 大於 0。否則會丟擲異常。

barrieraction 可以為null,  因為 barrieraction  中的**是在最後乙個到達屏障點的執行緒中執行,建議不要在此內部實現耗時較大的任務業務邏輯。

await 存在兩個過載方法,兩個**的實現非常相似,內部都是又呼叫了私有的 dowait 方法,兩者唯一區別就是,其中乙個帶有超時時間,如下所示:

public int await() throws interruptedexception, brokenbarrierexception  catch (timeoutexception toe) 

}

public int await(long timeout, timeunit unit)

throws interruptedexception,

brokenbarrierexception,

timeoutexception

await() 方法的作用是告訴 cyclicbarrier 我已經到達屏障點。但是,如果當前呼叫 await() 方法的執行緒不是最後乙個到達的,它將被休眠阻塞,直到以下其中一種情況發生,

從上面原始碼我們可以知道,await 方法內部其實都是呼叫了 dowait() 方法,那麼接下來重點分析一下 dowait 方法。

private int dowait(boolean timed, long nanos)

throws interruptedexception, brokenbarrierexception,

timeoutexception

// 獲取下標

int index = --count;

// 如果是 0,說明最後乙個執行緒呼叫了該方法

if (index == 0) finally

}// loop until tripped, broken, interrupted, or timed out

for (;;) catch (interruptedexception ie) else

}// 當有任何乙個執行緒中斷了,就會呼叫breakbarrier方法

// 就會喚醒其他的執行緒,其他執行緒醒來後,也要丟擲異常

if (g.broken)

throw new brokenbarrierexception();

// g != generation表示正常換代了,返回當前執行緒所在柵欄的下標

// 如果 g == generation,說明還沒有換代,那為什麼會醒了?

// 因為乙個執行緒可以使用多個柵欄,當別的柵欄喚醒了這個執行緒,就會走到這裡,所以需要判斷是否是當前代。

// 正是因為這個原因,才需要generation來保證正確。

if (g != generation)

return index;

// 如果有時間限制,且時間小於等於0,銷毀柵欄並丟擲異常

if (timed && nanos <= 0l)

}} finally

}

dowait() 方法的**邏輯也很簡單,大致流程如下:

在上面的源**中,我們可能需要注意generation 物件,在上述**中我們總是可以看到丟擲brokenbarrierexception異常,那麼什麼時候丟擲異常呢?如果乙個執行緒處於等待狀態時,如果其他執行緒呼叫reset(),或者呼叫的barrier原本就是被損壞的,則丟擲brokenbarrierexception異常。同時,任何執行緒在等待時被中斷了,則其他所有執行緒都將丟擲brokenbarrierexception異常,並將barrier置於損壞狀態。

同時,generation描述著cyclicbarrier的更新換代。在cyclicbarrier中,同一批執行緒屬於同一代。當有parties個執行緒到達barrier之後,generation就會被更新換代。其中broken標識該當前cyclicbarrier是否已經處於中斷狀態。

private static class generation
預設barrier是沒有損壞的。當barrier損壞了或者有乙個執行緒中斷了,則通過breakbarrier()來終止所有的執行緒:

private void breakbarrier()
在breakbarrier()中除了將broken設定為true,還會呼叫signalall將在cyclicbarrier處於等待狀態的執行緒全部喚醒。

當所有執行緒都已經到達barrier處(index == 0),則會通過nextgeneration()進行更新換地操作,在這個步驟中,做了三件事:喚醒所有執行緒,重置count,generation:

private void nextgeneration()
除了上面講到的柵欄更新換代以及損壞狀態,我們在使用cyclicbarrier時還要要注意以下幾點:

counddownlatch的await方法會等待計數器被count down到0,而執行cyclicbarrier的await方法的執行緒將會等待其他執行緒到達barrier point。

cyclicbarrier內部的計數器count是可被重置的,進而使得cyclicbarrier也可被重複使用,而counddownlatch則不能。

cyclicbarrier是由lock和condition實現的,而countdownlatch則是由同步控制器aqs(abstractqueuedsynchronizer)來實現的。

在構造cyclicbarrier時不允許parties為0,而countdownlatch則允許count為0。

下面我們看乙個簡單的例子,想必每個人都是非常喜歡旅遊的,旅遊的時候不可避免地需要加入某些旅行團。在每乙個旅行團中都至少會有乙個導遊為我們進行嚮導和解說,由於遊客比較多,為了安全考慮導遊經常會清點人數以防止個別旅客由於自由活動出現迷路、掉隊的情況。

我們可以看到,只有在所有的旅客都上了大巴之後司機才能將車開到下乙個旅遊景點。下面寫乙個程式簡單模擬一下。

public class cyclicbarrierexample 

@override

public void run() catch (interruptedexception | brokenbarrierexception e)

}public void sleep() catch (interruptedexception e) }}

public static void main(string args) );

try catch (interruptedexception | brokenbarrierexception e)

system.out.println("所有的遊客都已上車, 開始發車去往景點");}}

Java併發程式設計系列 CyclicBarrier

cyclicbarrier簡介 cyclicbarrier也是基於reentrantlock和condition的乙個同步工具類,它的作用是讓一些執行緒到達某個公共屏障點時,等待未到達的執行緒。當所有執行緒到達屏障點時,繼續往下執行。先看乙個例子 public class cyclicbarrier...

併發tools之柵欄CyclicBarrier

前言 cyclicbarrier翻譯過來就是 迴圈的屏障,這個類是乙個可以重複利用的屏障類.它允許一組執行緒相互等待,直到全部到達某個公共屏障點,然後所有的這組執行緒再同步往後執行 await 函式每被呼叫依次,計數便會減少1,並阻塞當前執行緒 當計數減至0,阻塞解除 countdownlatch和...

併發程式設計之併發佇列

jdk 中提供了一系列場景的併發安全佇列。總的來說,按照實現方式的不同可分為阻塞佇列和非阻塞佇列,前者使用鎖實現,而後者則使用cas 非阻塞演算法實現。1 非阻塞佇列 concurrentlinkedqueue concurrentlinkedqueue是無界非阻塞佇列,內部使用單項鍊表實現 其中有...