Rxjava
MoMo Lv5

概念

RxJava的异步实现,是通过一种拓展的观察者模式来实现的。

观察者模式面向的需求是:A对象(观察者)对B对象(被观察者)的某种变化高度敏感,需要在B对象变化的一瞬间做出变化。

程序的观察者模式,观察者不需要时刻盯着被观察者,而是采用注册(Regsiter)或者订阅(Subscribe)的方式,告诉被观察者,我是你的某种状态,你的状态在发生变化的时候来通知我。

Rxjava是NetFlix出品的Java框架, 官方描述为 a library for composing asynchronous and event-based programs using observable sequences for the Java VM,翻译过来就是“使用可观察序列组成的一个异步地、基于事件的响应式编程框架”。一个典型的使用示范如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
String s = "1234";
//执行耗时任务
emitter.onNext(s);
}
}).map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.parseInt(s);
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();

Rxjava原理

image

流式构建和事件传递

在Rxjava中,有Observable和Observer这两个核心的概念,但是它们在发生订阅时,跟普通的观察者模式写法不太一样,因为常识来讲,应该是观察者去订阅(subscribe)被观察者,但是Rxjava为了其基于事件的流式编程,只能反着来,observable去订阅observer,所以在rxjava中,subscribe可以理解“注入”观察者。

图中方形的框代表的是Observable,因为它代表节点,所以用Ni表示,圆形框代表的是观察者Observer,用Oi标识,后面加括号的意思是Oi持有其下游Observer的引用,左侧代表上游,右侧代表下游。图片里有三条有方向的彩色粗线,代表三个不同的流,这三个流是我们为了分析问题而抽象出来的的,代表从构建到订阅整个事件的流向,按照时间顺序从上到下依次流过,它们的含义分别是:

  • 从左往右的构建流:用来构建整个事件序列,这个流表征了整个链路的构建过程,相当于构造方法。
  • 从右往左的订阅流:当最终订阅(subscribe方法)这个行为发生的时候,每个节点从右向左依次执行订阅行为。
  • 从左往右的观察者回调流:当事件发生以后,会通过这个流依次通知给各个观察者。

构建流

在使用Rxjava时,其流式构建流程是很大的特色,避免了传统回调的繁琐。怎么实现的呢?使用过Rxjava的读者应该都知道,Rxjava的每一步构建过程api都是相同的,这是因为每一步的函数返回结果都是一个Observable,Observable提供了Rxjava所有的功能。Obsevable就是构建流的组件,我们可以看成一个个节点,这些节点串起来组成整个链路。Observable这个类实现了一个接口:ObservableSource,这个接口只有一个方法:subscribe(observer),也就是说,所有的Obsevable节点都具有订阅这个功能:

在编写Rxjava代码时,每一步操作都会生成一个新的Observable节点(包括ObserveOn和SubscribeOn线程变换操作),并将新生成的Observable返回,直到最后一步执行subscribe方法

无论是构建的第一步 create方法,还是observeOn,subscribeOn变换线程方法,还是各种操作符比如map,flatMap等,都会生成对应的Observable,每个Observble中要实现一个最重要的方法就是subscribe:

1
2
3
4
5
6
7
8
9
10
11
public final void subscribe(Observer<? super T> observer) {
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
subscribeActual(observer); // important
} catch (NullPointerException e) {
throw e;
} catch (Throwable e) {
RxJavaPlugins.onError(e);
throw npe;
}
}

其中subscribeActual(observer);,每个节点在执行subscribe时,其实就是在调用该节点的subscribeActual方法,这个方法是抽象的,每个节点的实现都不一样。举个例子,拿ObseverOn这个操作生成的ObservableSubscribeOn:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
//xxx省略
}

其中其父类继承Observable,所以它是一个Observble。

整个过程有点像builder模式,不同之处是它是生成了新的节点,而builder模式返回的自身。okHttp中拦截器跟这里有些相似,okHttp中会构建多个Chain节点,然后用相应的Intercepter去处理Chain。

编写Rxjava代码的过程其实就是构建一个一个Observable节点的过程

订阅流

构建过程只是通过构造函数将一些配置传给了各个节点,实际还没有执行任何代码,只有最后一步才真正的执行订阅行为。当最后一个节点调用subscribe方法时,是构建流向订阅流变化的转折点,

以图中为例:最后一个节点是N5,N5节点是最后一个flatmap操作符方法产生的,也就是说,最后是调用这个节点的subscribe方法,这个方法最终也是会调用到subscribeActual方法中去:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final boolean delayErrors;
final int maxConcurrency;
final int bufferSize;

public ObservableFlatMap(ObservableSource<T> source,
Function<? super T, ? extends ObservableSource<? extends U>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
super(source);
this.mapper = mapper;
this.delayErrors = delayErrors;
this.maxConcurrency = maxConcurrency;
this.bufferSize = bufferSize;
}
@Override
public void subscribeActual(Observer<? super U> t) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
return;
}
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}

static final class MergeObserver<T, U> extends AtomicInteger implements Disposable, Observer<T> {
final Observer<? super U> downstream;
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
}

N5节点是Observable节点,其subscribe方法最后调用的是subscribeActual方法

1
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));

这行代码需要注意两点:

  1. 生成了一个新的Observer,注意其构造函数中第一个参数t,保存到了downstream这个“下游”变量中,这个t从哪儿传进来的呢?对于N5节点来说,这个t就是我们代码中最后一步编写的Observer,比如我们常用的网络请求返回后的回调。也就是说,这个新生成的Observer包含了它的“下游”观察者的引用,在图片中对应最右边的圆形框O1(observer)。

  2. 执行订阅行为,这里的source是该节点构造函数传入的source,通过源码得知其实就是N5节点的上一个节点N4,因此,这里的订阅行为本质上是让当前节点的上一个节点订阅当前节点新生成的Observer

事实上,每个节点的执行流程都是类似的,也就是说,N5会调用N4的subscribe方法,而在N4的subscribe方法中,又去调用了N3的subscribe….一直到N0会调用source的subscribe方法。总结下来就是:

从最后一个N5节点的订阅行为开始,依次执行前面各个节点真正的订阅方法。在每个节点的订阅方法中,都会生成一个新的Observer,这个Observer会包含“下游”的Observer,这样当每个节点都执行完订阅(subscribeActual)后,也就生成了一串Observer,它们通过downstream,upstream引用连接。

以上就是订阅流的发生过程,简单讲就是下游节点调用上游节点的subscribeActual方法,从而形成了一个调用链

观察者回调流

当订阅流执行到最后,也就是第一个节点N0时,N0节点怎么建立的:

1
2
3
4
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

生成了ObservableCreate实例,我们看这个类(简化):

1
2
3
4
5
6
7
8
9
10
11
12
13
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}

@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
source.subscribe(parent);
}
}

所以订阅流的最终会掉到上面的subscrbeActual方法,它其实还是和其他节点一样,最主要的还是执行了source.subscribe(parent)

1
2
3
4
5
6
7
8
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
String s = "1234";
//执行耗时任务
emitter.onNext(s);
}
})

上面代码直接拿的开头的例子,这个source是一个ObservableOnSubscribe,看它的subscribe方法里,这个函数里面其实是订阅流和观察者流的转折点,也就是流在这儿“转向了”。这里,这个事件源没有像节点那样,调用上一个节点的订阅方法,而是调用了其参数的emitter的onNext方法,这个emitter对应N0节点是CreateEmitter这个类,这个类里面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static final class CreateEmitter<T> extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {

final Observer<? super T> observer;

CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (!isDisposed()) {
observer.onNext(t);
}
}
//省略
}

看它的onNext方法,执行的是observer.onNext(t)

observer是谁?构造函数传进来的,也就是N0节点subscribeActual方法中的observer,这个observer是前一个节点N1生成的Observer,我们看N1节点,是一个Map,对应的Observable节点里的Observer源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;

MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}

@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}

MapObserver的onNext方法,核心就两句,一个是mapper.apply(t),另一个就是downstream.onNext(v)。也就是说,这个mapObserver干了两件事,一个是把上个节点返回的数据进行一次map变换,另一个就是将map后的结果传递给下游,下游是N2节点的Observer,对应图中O4,依次类推,事件发生以后,通过各个节点的Observer事件源被层层处理并传递给下游,一直到最后一个观察者执行完毕,整个事件处理完成。

线程调度

SubscribeOn

在订阅流发生的的时候,大多数节点都是直接调用上一个节点的subscribe方法,实现虽有差别,但大同小异。唯一有个最大的不同就是subscribeOn这个节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;

public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}

@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

observer.onSubscribe(parent);

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
}

普通的节点执行时,大多只是简单的执行source.subscribe(observer),但是这个不一样。先看第二行,它调用了观察者的onSubscribe方法,在自定义Observer的时候,里面有这个回调。

scheduler.scheduleDirect(new SubscribeTask(parent))

1
2
3
4
5
6
7
8
9
10
11
12
13
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}

创建了一个worker,一个runnable,然后将二者封装到一个DisposeTask中,最后用worker执行这个task

1
2
@NonNull
public abstract Worker createWorker();

createworker是一个抽象方法,所以需要去找Scheduler的子类,如果在子线程中执行,我们一般设置调度器为Schedulers.io():

在IOSchedluer类中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}

@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;

这里的executor就是一个ExecutorService,这里的submit方法,就是将callable丢到线程池中去执行任务了。

scheduler.scheduleDirect(new SubscribeTask(parent))

对于io线程的调度器来说,上面的代码就是将new SubscribeTask(parent)丢到线程池中执行,我们看参数里面的SubscribeTask:

1
2
3
4
5
6
7
8
9
10
11
12
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;

SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}

@Override
public void run() {
source.subscribe(parent);
}
}

看run方法:source.subscribe(parent),这里的parent跟普通节点一样,仍然是本节点生成的新的Observer,对于本节点来说,是一个SubscribeOnObserver。因此,对于subscribeOn这个节点,它跟普通的节点不同之处在于:

  • SubscribeOn节点在订阅的时候,将它的上游节点的订阅行为,以runnable的形式扔给了一个线程池(对于IO调度器来说),也就是说,当订阅流流到SubscribeOn节点时,线程发生了切换,之后流向的节点都在切换后的线程中执行。

那么线程切换的原理就是在订阅流中塞了一个线程变化操作。当订阅流流过这个节点后,后面的节点只是单纯的传递给上游节点而已,无论是普通的操作符,还是ObserveOn节点,都是简单的传递给上游,没有做线程切换

如果上游还有别的subscribeOn,会发生什么?

假设N1节点的map修改程subscribeOn(AndroidScheduler.Main),也就是说,切换到主线程。我们还是从N2节点开始分析,刚才说到最后会执行到SubscribeTask里的Run方法,注意此时source.subscribe(parent)发生在子线程中,接下来,回调用N1节点的subscribe,N1节点回调用scheduler.scheduleDirect(new SubscribeTask(parent)),方法,此时,因为线程调度器是主线程的,我们看它的代码:

1
2
3
4
private static final class MainHolder {
static final Scheduler DEFAULT
= new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}

看看这个HandlerScheduler的方法:

1
2
3
4
5
6
7
@Override
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
handler.postDelayed(scheduled, unit.toMillis(delay));
return scheduled;
}

这里会把N1节点上游的操作,通过Handler机制,扔给主线程操作,虽然这一步是在N2节点的子线程中执行的,但是它之前的事件仍然会在主线程中执行。因此:

subscribeOn节点影响它前面的节点的线程,如果前面还有多个subscribeOn节点,最终只有第一个,也就是最上游的那个节点生效

ObserveOn

前面的subscribeOn线程切换是在订阅流中发生的,接下来的ObserveOn比较简单,它发生在第三条流-观察者回调流中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
//简化
@Override
public void onNext(T t) {
if (done) {
return;
}

if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
}

观察者流是通过onNext()方法传递的,最后一行,schedule(),线程切换

1
2
3
4
5
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}

worker是这个节点订阅时指定的 scheduler.createWorker(), 以主线程观察为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
if (async) {
message.setAsynchronous(true);
}
handler.sendMessageDelayed(message, unit.toMillis(delay));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}

同样,通过Handler机制,将runnable扔给主线程执行,runnable是是this,就是这个ObserveOnObserver。看它的run方法:

1
2
3
4
5
6
7
8
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}

drainNormal:

1
2
3
4
5
6
7
void drainNorml() {
//简化
final Observer<? super T> a = downstream;
T v;
v = q.poll();
a.onNext(v);
}

还是把上游的处理结果扔给下游。也就是说observeOn会将它下游的onNext操作扔给它切换的线程中,因此ObserveOn影响的是它的下游。

如果有多个observeOn会发生什么?很简单,思路同subscribeOn,每个ObserveOn只会影响它下游一直到下一个obseveOn节点的线程,也就是分段的。

总结

Rxjava有点像观察者模式和责任链模式的结合,普通的观察者模式一般是被观察者通知多个观察者,而Rxjava则是被观察者通知第一个Obsever,接下来Observer依次通知其他节点的Observer,形成一个“观察链”,将观察者模式进行了一种类似链式的变换,每个节点又会执行它不同的“职责”:

最原始的订阅事件从最后一个节点开始,沿着Obsevable节点往上游传递,事件源头处理完任务后,通知给最上游的观察者,然后通知沿着Observer链条往下游传递,直到最后一个观察者结束。

flatmap

image

对事件源进行了flatmap操作,flatmap在订阅流的时候跟其他的操作符基本一致,但是在观察者回调流中却很不一样,它在回调流中做了以下内容:

  • flatmap将上游传过来的数据进行了一次变换,变成了一个Observable,如何变的是由开发者自定义的,比如图中下面三个竖着的三个Observable节点流,这条流跟上面的四个Observable节点本质上是一样的。flatmap这个节点的Obsever将上游的数据转化成了一个新的Observable流,然后执行这条新的流,当这条新的流走完时,会接着原来的观察者流继续走下去。也就是说,flatMap这个操作符将一条新的Observable节点流“插入”到原始的观察者回调流上去了。

图中的橘黄色和紫色的虚线是flatmap的一种特殊情况,当新插入的流的事件源有多个的时候,这是会产生分流,每个流都会执行一遍下游的原始节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
String[] mainArmy = {"第一大队", "第二大队", "第三大队"};
Observable.fromArray(mainArmy)
.flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String s) throws Exception {
String[] littleArmy = {s + "的第一小队", s + "的第二小队", s + "的第三小队"};
return Observable.fromArray(littleArmy);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String little) throws Exception {
System.out.println(little);
}
});

这个代码运行结果是

1
2
3
4
5
6
7
8
9
第一大队的第一小队
第一大队的第二小队
第一大队的第三小队
第二大队的第一小队
第二大队的第二小队
第二大队的第三小队
第三大队的第一小队
第三大队的第二小队
第三大队的第三小队

把list铺平展开,而且防止了繁琐的嵌套循环。它的本质是在合并Obsevable流。

Powered by Hexo & Theme Keep
Unique Visitor Page View