Rxjava2原理解析,手写操作符实现

it2022-05-05  136

RxAndroid是RxJava的一个针对Android平台的扩展,主要用于 Android 开发

 

1 Rxjava项目地址:      https://github.com/ReactiveX/Rxjava

2 Rxjava文档:  https://mcxiaoke.gitbooks.io/rxdocs/content/

3 Rxjava经典资料:https://github.com/lzyzsd/Awesome-RxJava

4 操作符说明 :http://reactivex.io/documentation/operators/merge.html

 

Rxjava1和Rxjava2对比:

1 、 RxJava 2x 不再支持 null 值,如果传入一个null会抛出 NullPointerException

2 、Observable.just(null)(不支持)

3 、RxJava2 所有的函数接口(Function/Action/Consumer)均设计为可抛出Exception, 自己去解决编译异常需要转换问题。

4、 RxJava1 中Observable不能很好支持背压,在RxJava2 中将Oberservable实现成不支持背压,而新增Flowable 来支持背压。

 

Rxjava2 背压(Flowable),解决上游事件发送过快下游处理慢的情况。

背压四中策略:

1  BackpressureStrategy.ERROR:若上游发送事件速度超出下游处理事件能力,且事件缓存池已满,则抛出异常 //阻塞时队列

2 BackpressureStrategy.BUFFER:若上游发送事件速度超出下游处理能力,则把事件存储起来等待下游处理

3 BackpressureStrategy.DROP:若上游发送事件速度超出下游处理能力,事件缓存池满了后将之后发送的事件丢弃

4 BackpressureStrategy.LATEST:若上有发送时间速度超出下游处理能力,则只存储最新的128个事件。

 

下面讲一下Rxjava中的几个操作符是怎么实现的,有create、map、subscribeOn、ObserverOn。我们先讲create(好理解)在往后深入,一下并不能说相当于Rxjava源码,只能说是原理相同,细节上比较随意,如果想了解rxjava源码细节实现的,还是需要查看rxjava的源码。

//Observable Observable.create( //发射器 new ObservableOnSubscribe<String>() { @Override public void onSubscribe(Observer<? super String> observer) { observer.onNext("abcd"); } }) //订阅 .subscribe( //Observer new ObserverUser<String>() { @Override public void onNext(String integer) { Log.e(TAG, "onNext:" + integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); 首先先讲一下整体实现的逻辑,有四个角色存在:观察者(ObserverUser),被观察者(Observable),发(ObservableOnSubscribe),订阅方法(subscribe)。 public class Observable<T> { private ObservableOnSubscribe<T> observableOnSubscribe; private Observable(ObservableOnSubscribe observableOnSubscribe) { this.observableOnSubscribe = observableOnSubscribe; } public static <T> Observable<T> create(ObservableOnSubscribe<T> observableOnSubscribe) { Observable observable = new Observable(observableOnSubscribe); return observable; } }

这只是Observable中的一部分代码,我们先只看create方法,下面先介绍发射器是什么:

public interface ObservableOnSubscribe<T> extends Action1<Observer<? super T>> { } public interface Action1<T> { void onSubscribe(T t); }

create传入了ObservableOnSubscribe<String>范型是String,并实现onSubscribe(T t)方法,可以看到Action1把范型String变成了Observer<String>并赋值给onsubscribe的参数中。然后在create中会把传入的发射器赋值到全局变量observableOnSubscribe。下面来看一下订阅方法subscribe,

public class Observable<T> { private ObservableOnSubscribe<T> observableOnSubscribe; private Observable(ObservableOnSubscribe observableOnSubscribe) { this.observableOnSubscribe = observableOnSubscribe; } public static <T> Observable<T> create(ObservableOnSubscribe<T> observableOnSubscribe) { Observable observable = new Observable(observableOnSubscribe); return observable; } public void subscribe(Observer<? super T> observer) { observableOnSubscribe.onSubscribe(observer); } }

这里会把Observer传入,并执行observable的onsubscribe方法,这里传入的Observer是<? super T>所以Observer的范型可以自动确认。看一下Observer:

public interface Observer<T> { public void onNext(T t); public void onError(Throwable e); public void onComplete(); } 所以在执行observer.onNext的时候,观察者onNext就会得到消息。

上述就是create操作符,下面来讲一下map,subscribeOn、observerOn。

先看一下这几个操作符的测试调用形式。

Observable.create(new ObservableOnSubscribe<String>() { @Override public void onSubscribe(Observer<? super String> observer) { Log.e(TAG, "thread1 :" + Thread.currentThread().getName()); observer.onNext("15"); } }).map(new Function<String, Integer>() { @Override public Integer apply(String s) { Log.e(TAG, "thread2 :" + Thread.currentThread().getName()); return Integer.valueOf(s); } }).subscribeOn() .observerOn() .subscribe(new ObserverUser<Integer>() { @Override public void onNext(Integer integer) { Log.e(TAG, "onNext:" + integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });

先讲一个大致的角色已经事件的传递过程,首先每一个操作符都会返回一个新的Observable对象,所以可以实现链式调度的功能,每个Observable里都会有一个自己的Observer,会持有上一个Observable和下面传入的Observer,事件发送分为订阅和发送两个过程。订阅是从下往上,subscribe把传入的observer传递给ObserverOn返回的Observable中,并触发发射器里的onSubscribe方法,执行subscribeOn所在的Observable的订阅方法又吧当前的Observer向上传递。事件发送是拿到下一个的Observer执行onnext()方法。

 

先看map操作符

public class Observable<T> { private ObservableOnSubscribe<T> observableOnSubscribe; private Observable(ObservableOnSubscribe observableOnSubscribe) { this.observableOnSubscribe = observableOnSubscribe; } public static <T> Observable<T> create(ObservableOnSubscribe<T> observableOnSubscribe) { Observable observable = new Observable(observableOnSubscribe); return observable; } public void subscribe(Observer<? super T> observer) { observableOnSubscribe.onSubscribe(observer); } public <R> Observable<R> map(Function<? super T, ? extends R> function) { return new Observable(new OnSubscribeLift(observableOnSubscribe, function)); } public Observable<T> subscribeOn() { return Observable.create(new OnObservableSchudelIo<T>(this)); } public Observable<T> observerOn() { return new Observable(new OnObservableSchudelMain(observableOnSubscribe)); } }

先介绍一下Function和OnsubscribeLift。

public interface Function<T, R> { R apply(T t); } public class OnSubscribeLift<T, R> implements ObservableOnSubscribe<R> { ObservableOnSubscribe<T> parent; private final OperatorMap<T, R> operatorMap; public <T> OnSubscribeLift(ObservableOnSubscribe observableOnSubscribe, Function<? super T, ? extends R> function) { this.parent = observableOnSubscribe; operatorMap = new OperatorMap(function); } @Override public void onSubscribe(Observer<? super R> observer) { Observer<? super T> apply = operatorMap.apply(observer); parent.onSubscribe(apply); } }

Function<T,R>其实就是一个接口,apply方法T为参数,R为返回值。

ObSubscribeLift<T,R>就是一个ObservableOnSubscribe发射器,内部实现onsubscribe方法。

在回到map方法中。会创建一个Observable,发射器为OnSubscribeLift并把create产生的Observable(这里用的是发射器,其实都差不多,因为都是为了调用发射器的订阅方法,而Observable中的subscribe也可以订阅)和function传入。在OnSubscribeLift

构造中创建了OperatorMap把function传入。

public class OperatorMap<T, R> implements OperateFunction<T, R> { private final Function function; public <T, R> OperatorMap(Function<? super T, ? extends R> function) { this.function = function; } //传入下面的observer传入当前的observer并返回 @Override public Observer<? super T> apply(Observer<? super R> observable) { return new MapScriber<>(observable, function); } //内置观察者 class MapScriber<T, R> implements Observer<T> { private final Function<? super T, ? extends R> function; private final Observer<? super R> outSideObserver; public MapScriber(Observer<? super R> observable, Function<? super T, ? extends R> function) { this.outSideObserver = observable; this.function = function; } @Override public void onNext(T t) { R apply = function.apply(t); outSideObserver.onNext(apply); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } } }

在看一下OperateFunction<T,R>

public interface OperateFunction<T, R> extends Function<Observer<? super R>, Observer<? super T>> { }

在执行订阅的时候,会走到OnSubscribeLift中的onSubscribe

然后执行operatorMap.apply(observer)得到当前的observer,也就是OperatorMap中的内部类MapScriber,再通过parent.onsuvscribe(apply)订阅事件向上传递。

在事件向下发送的时候会执行到Mapscriber中的onNext

//内置观察者 class MapScriber<T, R> implements Observer<T> { private final Function<? super T, ? extends R> function; private final Observer<? super R> outSideObserver; public MapScriber(Observer<? super R> observable, Function<? super T, ? extends R> function) { this.outSideObserver = observable; this.function = function; } @Override public void onNext(T t) { R apply = function.apply(t); outSideObserver.onNext(apply); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }

在这里执行Map传入的Function.apply方法,让用户去操作然后返回数据,在执行outSideObserver.onNext向下继续发送。

 

以上Map的代码就讲解完了,Map操作符比较难理解,理解了之后subscribeOn和observerOn就好理解了。

subscribeOn其实就是在订阅的时候扔到线程里执行。

@Override public void onSubscribe(Observer<? super T> observer) { final ScheulderIoSub scheulderIoSub = new ScheulderIoSub(observer); executorService.execute(new Runnable() { @Override public void run() { tObservable.subscribe(scheulderIoSub); } }); }

 

observerOn其实就是在发送时间的时候放到主线程执行

handler = new Handler(Looper.getMainLooper()); @Override public void onNext(final T t) { handler.post(new Runnable() { @Override public void run() { //在主线程向下发送 observer.onNext(t); } }); }

项目地址: https://github.com/wangchao0837/RxjavaOperation


最新回复(0)