手寫rxjava事件變換

2021-08-18 05:02:45 字數 4167 閱讀 8650

首先還是看怎麼使用

observable.just("")

.map(new function()

}).map(new function()

}).subscribeon(schedulers.io()).

observeon(androidschedulers.mainthread())

.subscribe(new consumer()

});

看原始碼分析

just原始碼分析

public

static

observablejust(t item)

接下來看這個原始碼subscribe(new consumer() )consumer原始碼分析

public

inte***ce consumer

subscribe原始碼分析

public

final disposable subscribe(consumer super t> onnext)

public

final disposable subscribe(consumer super t> onnext, consumer super throwable> onerror,

action oncomplete, consumer super disposable> onsubscribe)

自己手寫subscribe方法

首先還是寫觀察者介面

public

inte***ce observer

然後寫訂閱介面observablesource

public

inte***ce observablesource

observablejust物件封裝

public

class observablejustextends observable

@override

protected

void

subscribeactual(observer super t> observer)

//執行在這個類

private

class scalardisposable

public

void

run() catch (exception e) }}

}

然後先寫mainactivity的使用

observable.just("")

.subscribe(new consumer()

});

consumer介面

public

inte***ce consumer

observable

public

abstract

class

observable

implements

observablesource

//實際就是返回observablejust物件

private

static

observableonassembly(observablejustsource)

@override

public

void subscribe(@nonnull observer super t> observer)

//訂閱

protected

abstract

void subscribeactual(observer super t> observer);

public

void subscribe(consumerconsumer)

}

lambdaobserver

public

class lambdaobserverimplements observer

@override

public

void

onsubscribe()

@override

public

void

onnext(@nonnull t s)

@override

public

void

onerror(@nonnull throwable e)

@override

public

void

oncomplete()

}

map操作符原始碼分析

map(new function

public

inte***ce function

map原始碼分析

public

final

//實際是將我們的傳入的引數和目標轉換成observablemap this是observablesource

}

observablemap看這個類,它會複寫subscribeactual

public

void

subscribeactual(observer super u> t)

首先看mainactivity的使用

new thread(new runnable() 

}).map(new function()

}).map(new function()

}).subscribe(new consumer()

});}

});}

}).start();

function介面

//r是目標

public

inte***ce function

obserable新增**

public observablemap(function

function)

observablemap類

public

class

observablemap

extends

observable

//重新複寫了訂閱

RxJava 變換操作

buffer buffer操作符,將原有observable發射的資料快取起來,比如buffer 2 就每2個資料放進乙個集合,然後發射這個集合出去。buffer方法有很多過載的方法。flatmap 該操作符,使用乙個指定的函式對原始observable發射的每一項資料執行變換操作 lift 這個函...

RXJava 變換操作

這個頁面展示了可用於對observable發射的資料執行變換操作的各種操作符。變換操作 map 對序列的每一項都應用乙個函式來變換observable發射的資料序列 flatmap concatmap and flatmapiterable 將observable發射的資料集合變換為observab...

RxJava實現事件匯流排 RxBus 及詳解

先把大家都認同的實現 貼出來 public class rxbus private static class bussingleholder 單例模式 public static rxbus getinstance 傳送訊息 即呼叫所有觀察者的onnext 方法 param obj public v...