RxJava2(五)執行緒排程器Scheduler

2021-09-24 12:32:50 字數 3132 閱讀 6970

scheduler可排程的執行緒有:

· single:定長為1的執行緒池(singledthreadexecutor),復用這個執行緒。

· newthread:啟動一條新執行緒,並在其中進行操作。

· computation:固定執行緒池(fixedthreadpool(n)),大小為cpu數,適合與密集型計算。

· io:無數量上限執行緒池(cachedthreadpool),可復用空閒執行緒,適合i/o操作,檔案讀寫,網路資訊互動等。

· trampoline:直接在當前執行緒優先執行。

· from:自定義排程器。

· androidschedulers.mainthread 主線程。

切換執行緒方法:

subscribeon(scheduler) 針對上游,指定處理資料在特定的執行緒排程器。若多次呼叫,只有第一次呼叫有用。

observeon(scheduler)    針對下游,指定下游操作在特定的執行緒排程器。若多次呼叫,每次都有用,來回切換。

private void init() 

}).observeon(schedulers.io())

.map(new function()

}).subscribeon(schedulers.computation())

.map(new function()

}).observeon(schedulers.newthread())

.map(new function()

}).observeon(androidschedulers.mainthread())

.subscribe(new consumer()

});}

private void showlog(context context, string whichscheduler, string value)

輸出結果:

testscheduler

專門用於測試的排程器,非執行緒安全。只有被呼叫了時間才會啟動。用於測試一些不引入真實併發性,允許推進虛擬時間的排程器。當未呼叫時間,排程器什麼也不做,相當於所有任務都未執行。當呼叫時間後,啟動相當於乙個時間流,好像他們本來就已經啟動好久了,你直接移動虛擬時間到這個時間流的任意時間節點,做操作就好。1.advancetimeto 將排程器的始終移動到某個時刻。

private void test() 

});testscheduler.createworker().schedule(new runnable()

},10,timeunit.milliseconds);

testscheduler.createworker().schedule(new runnable()

},20,timeunit.milliseconds);

//呼叫後,排程器啟動,移動到10秒位置

testscheduler.advancetimeto(10,timeunit.seconds);

log.d(tag, "移動虛擬時間後,當前時間 " + testscheduler.now(timeunit.seconds));

}

輸出結果:

run: 立即執行

run: 延遲10秒執行

run: 延遲20秒執行

移動虛擬時間後,當前時間 10

由於呼叫advancetimeto(10,timeunit.seconds) 相當於所有任務都準備好了,直接移動虛擬時間到任意時間節點。2.advanceby 在原來基礎上移動給定時間值。

3.triggeraction 不會修改時間,它執行計畫要啟動,但當前還未啟動的任務。

private void test() 

});testscheduler.createworker().schedule(new runnable()

},10,timeunit.milliseconds);

testscheduler.createworker().schedule(new runnable()

},20,timeunit.milliseconds);

log.d(tag, "當前時間 " + testscheduler.now(timeunit.seconds)); 

testscheduler.triggeractions();

}

輸出結果:

當前時間 0

run: 立即執行

由於未呼叫advancetimeto和advanceby,不啟動排程器。因此直接執行到列印log 「當前時間」,又因呼叫 testscheduler.triggeractions(),所以執行第乙個計畫執行而又未執行的worker執行緒。

增加呼叫advancetimeto

private void test() 

});testscheduler.createworker().schedule(new runnable()

},10,timeunit.milliseconds);

testscheduler.createworker().schedule(new runnable()

},20,timeunit.milliseconds);

log.d(tag, "當前時間 " + testscheduler.now(timeunit.seconds));

//呼叫後,排程器啟動,移動到10秒位置

testscheduler.advancetimeto(20,timeunit.seconds);

log.d(tag, "移動虛擬時間後,當前時間 " + testscheduler.now(timeunit.seconds));

testscheduler.triggeractions();

}

輸出結果:

當前時間 0

run: 立即執行

run: 延遲10秒執行

run: 延遲20秒執行

移動虛擬時間後,當前時間 20

可以看出當呼叫advancetimeto(20,timeunit.seconds)後,排程器被啟動。當呼叫triggeractions()時,因所有worker執行緒都被執行,所以沒有還沒啟動的任務,不列印結果。

RxJava2實現執行緒切換

被觀察者 observable 觀察者 observer 訂閱 subscribe 1 建立被觀察者 observable observable observable.create new observableonsubscribe 2 建立觀察者 observer observer new obs...

RxJava2原始碼解析

原始碼總結 observabel 通過create方法。將observableonsubscribe物件傳遞給自己。通過subscribe方法。建立 observableemitter發射器物件。發射器裡又封裝了observer。發射器又作為引數傳遞 給observableonsubscribe物件...

Rxjava2的簡單實用

第一種方式實現觀察者和被觀察者的建立和建立聯絡 1 建立被觀察者 建立被觀察者 傳送資料 observable observable observable.create new observableonsubscribe 2 建立觀察者 建立觀察者 接受資料 io.reactivex.observe...