如何以併發方式在同乙個流上執行多種操作? 複製流

2021-09-13 11:14:08 字數 1498 閱讀 3815

正常情況下,乙個流在執行一次終端操作之後便結束了。本文通過複製流內資料的方式,曲折的實現了同乙個流上執行多次操作。

demo只是思路,其效能並不一定高效,尤其是資料都在記憶體中處理時複製的開銷很大。但如果流涉及大量i/o,也許效能會有提高。

public class streamforker

public streamforkerfork(object key, function, ?> f)

public results getresults() finally

return consumer;

}private forkingstreamconsumerbuild() , (m1, m2) -> );

return new forkingstreamconsumer<>(queues, actions);

}private future<?> getoperationresult(list> queues, function, ?> f)

}

accept方法將原始流中所有的資料新增到各個blockingqueue內,此處實現了複製

class forkingstreamconsumerimplements consumer, results 

@override

public void accept(t t)

@suppresswarnings("unchecked")

void finish()

@suppresswarnings("unchecked")

@override

public r get(object key) catch (exception e)

}}

此處重寫了tryadvance介面,只是簡單的從blockingqueue中取出資料,執行action。業務邏輯中複製流是為了做什麼事情,action就是這件事情。forkingstreamconsumer.end_of_stream是queue中資料結束的標示

class blockingqueuespliteratorimplements spliterator

@override

public boolean tryadvance(consumer<? super t> action) catch (interruptedexception e)

}if (t != forkingstreamconsumer.end_of_stream)

return false;

}@override

public spliteratortrysplit()

@override

public long estimatesize()

@override

public int characteristics()

}

多執行緒併發同乙個表問題

table for update for update of a.id a1.有where條件時,鎖定條件中指定的資料行 行級封鎖 2.無where條件是,鎖定表a 表級封鎖 1.有where條件時,鎖定條件中指定的資料行 行級封鎖 2.無where條件是,鎖定表a 表級封鎖 a,b直接封鎖a,b表...

如何同步共享同乙個list

例如多個執行緒要從同乙個list 中取物件,別的執行緒取了,其他執行緒則不可以再去這個物件.1.同步多執行緒 對 linklist 的removefirst 的操作或者 其他list 的remove 再get第乙個物件 的方法來實現.class sendsmstask implements runn...

celery 重複執行同乙個task

今天用celery 執行 task的時候碰到了 重複執行的情況,而且是重複執行了8次 電腦是8核的 谷歌了一下,celery 在執行task時有個機制,就是任務時長超過了 visibility timeout 時還沒執行完,就會指定其他worker重新開始task,預設的時長是一小時.但是我這個肯定...