併發程式設計之 Exchanger 原始碼分析

2021-09-11 12:32:54 字數 4384 閱讀 4903

juc 包中除了 countdownlatch, cyclicbarrier, semaphore, 還有乙個重要的工具,只不過相對而言使用的不多,什麼呢? exchange —— 交換器。用於在兩個執行緒之間交換資料,a 執行緒將 a 資料交給 b 執行緒,b 執行緒將 b 資料交給 a 執行緒。

具體使用例子參見 併發程式設計之 執行緒協作工具類。我們這篇文章就不再講述如何使用了。

而今天,我們將從原始碼處分析,exchange 的實現原理。如果大家看過之前關於 synchronousqueue 的文章 併發程式設計之 synchronousqueue 核心原始碼分析,就能夠看的出來,exchange 的原理和他很類似。

類 uml:

內部有 2 個內部類: node , participant 重寫了 threadlocal 的 initialvalue 方法。

構造方法如下:

public

exchanger

()static

final

class

participant

extends

threadlocal

}複製**

就是建立了乙個 threadlocal 物件,並設定了初始值,乙個 node 物件。

看看這個 node 物件:

@sun.misc.contended 

static

final

class

node

複製**

這個 node 物件就是 a ,b 執行緒實際儲存資料的容器。a 執行緒存在 item 屬性上,b 執行緒儲存在 match 執行緒上,稱為匹配。同時,有個執行緒物件,你應該猜到做什麼用處的吧,對,掛起執行緒的。

和 synchronousqueue 的區別在於, synchronousqueue 使用了乙個變數來儲存資料項,通過 isdata 來區別 「存」 操作和 「取」 操作。而 exchange 使用了 2 個變數,就不用使用 isdata 來區分了。

我們再來看看 exchange 的唯一重要方法 : exchange 方法。

**如下:

public v exchange

(v x)

throws interruptedexception

return (v == null_item) ? null : (v)v;

}複製**

說一下方法的邏輯:

如果執行 slotexchange 有結果,就不再執行 arenaexchange.

如果 slot 被占用了,就執行 arenaexchange.

返回值是什麼呢?返回值就是對方執行緒的資料項,如果 a 執行緒先呼叫,那麼 a 執行緒將資料項存在 item 中,b 執行緒後呼叫,則 b 執行緒將資料存在 match 屬性中。

a 返回的是 match 屬性,b 返回的是 item 屬性。

從該方法中,可以看到,有 2 個重要的方法: slotexchange, arenaexchange。先簡單說說這兩個方法。

當沒有多執行緒併發操作 exchange 的時候,使用 slotexchange 就足夠了。 slot 是乙個 node 物件。

當出現併發了,乙個 slot 就不夠了,就需要使用乙個 node 陣列 arena 操作了。

so,我們先看看 slotexchange 方法吧,兩個方法的邏輯類似。

**加注釋如下:

private

final object slotexchange

(object item, boolean timed, long ns)

// 如果使用 cas 修改slot 失敗了,說明 slot 被使用了,那就需要建立 arena 陣列了

if (ncpu > 1 && bound == 0 &&

u.compareandswapint(this, bound, 0, seq)) // seq == 256; 預設 bound == 0

arena = new node[(full + 2) << ashift];// length = (2 + 2) << 7 == 512

}// 如果 slot 是 null, 但 arena 有值了,說明有執行緒競爭 slot 了,返回 null, 執行 arenaexchange 邏輯

else

if (arena != null)

return

null; // caller must reroute to arenaexchange

else

}// 當走到這裡的時候,說明 slot 是 null, 且 arena 不是 null(沒有多執行緒競爭使用 slot),並且成功將 item 放入了 slot 中.

// 這個時候要做的就是阻塞自己,等待對方取出 slot 的資料項,然後重置 slot 的資料和池化物件的資料

// 偽隨機數

int h = p.hash;

// 超時時間

long end = timed ? system.nanotime() + ns : 0l;

// 自旋,預設 1024

int spins = (ncpu > 1) ? spins : 1;

object v;

// 如果這個值不是 null, 說明資料被其他執行緒拿走了, 並且其他執行緒將資料賦值給 match 屬性,完成了一次交換

while ((v = p.match) == null)

// 如果自旋數不夠了,且 slot 還沒有得到,就重置自旋數

else

if (slot != p)

spins = spins;

// 如果 slot == p 了,說明對 slot 賦值成功

// 如果執行緒沒有中斷 && 陣列不是 null && 沒有超時限制

else

if (!t.isinterrupted() && arena == null &&

(!timed || (ns = end - system.nanotime()) > 0l))

// 如果有超時限制,使用 cas 將 slot 從 p 變成 null,取消這次交換

else

if (u.compareandswapobject(this, slot, p, null))

}// 將 p 的 match 屬性設定成 null, 表示初始化狀態,沒有任何匹配 >>> putorderedobject是putobjectvolatile的記憶體非立即可見版本.

u.putorderedobject(p, match, null);

// 重置 item

p.item = null;

// 保留偽隨機數,供下次種子數字

p.hash = h;

// 返回

return v;

}複製**

原始碼還是有點小長的。簡單說說邏輯。

exchange 使用了物件池的技術,將物件儲存在 threadlocal 中,這個物件(node)封裝了資料項,執行緒物件等關鍵資料。

當第乙個執行緒進入的時候,會將資料放到 池化物件中,並賦值給 slot 的 item.並阻塞自己(通常不會立即阻塞,而是使用 yield 自旋一會兒),等待對方取值.

當第二個執行緒進入的時候,會拿出儲存在 slot item 中的值, 然後對 slot 的 match 賦值,並喚醒上次阻塞的執行緒.

當第乙個執行緒阻塞被喚醒後,說明對方取到值了,就獲取 slot 的 match 值, 並重置 slot 的資料和池化物件的資料,並返回自己的資料.

如果超時了,就返回 time_out 物件.

如果執行緒中斷了,就返回 null.

在該方法中,會返回 2 種結果,一是有效的 item, 二是 null--- 要麼是執行緒競爭使用 slot 了,建立了 arena 陣列,要麼是執行緒中斷了.

用一幅圖來看看具體邏輯,其實還是挺簡單的。

當 slot 被別是執行緒使用了,那麼就需要建立乙個 arena 的陣列了。通過操縱陣列裡面的元素來實現資料交換。

關於 arenaexchange 方法的原始碼我就不貼了,有 2 個原因,乙個是總體邏輯和 slotexchange 相同,第二個原因則是,其中有一些細節我沒有弄懂,就不發出自己寫**注釋了,防止誤導。但我們已經掌握了 exchange 的原理。

exchange 和 synchronousqueue 類似,都是通過兩個執行緒操作同乙個物件實現資料交換,只不過就像我們開始說的,synchronousqueue 使用的是同乙個屬性,通過不同的 isdata 來區分,多執行緒併發時,使用了佇列進行排隊。

exchange 使用了乙個物件裡的兩個屬性,item 和 match,就不需要 isdata 屬性了,因為在 exchange 裡面,沒有 isdata 這個語義。而多執行緒併發時,使用陣列來控制,每個執行緒訪問陣列中不同的槽。

最後,用我們的圖收尾吧:

java併發程式設計之Exchanger

exchanger v 可以交換的物件型別 可以在對中對元素進行配對和交換的執行緒的同步點。每個執行緒將條目上的某個方法呈現給 exchange 方法,與夥伴執行緒進行匹配,並且在返回時接收其夥伴的物件。exchanger 可能被視為 synchronousqueue 的雙向形式。exchanger...

併發程式設計之 CyclicBarrier 原始碼分析

在之前的介紹 countdownlatch 的文章中,countdown 可以實現多個執行緒協調,在所有指定執行緒完成後,主線程才執行任務。但是,countdownlatch 有個缺陷,這點 jdk 的文件中也說了 他只能使用一次。在有些場合,似乎有些浪費,需要不停的建立 countdownlatc...

併發工具原始碼系列 Exchanger 原始碼解析

exchanger 執行緒之間可以進行元素交換 了解就行了 但是如果多個執行緒都來交換了,那乙個 node 效率太低,所以就提供了個 node 陣列叫 arena 讓執行緒們當作場地來交換。那麼現在交換的話就要在迴圈中進行了,因為槽位多了,就有很多時候沒有交換物件或者被別的執行緒搶走了。屬性 sun...