RxJava 1.x 入门
RxJava 1.x 入门
这篇文章基本上就是在抄袭 扔物线 的 给 Android 开发者的 RxJava 详解 一文,外加了少许笔记。
响应式编程模式(reactive pattern)
RxJava GitHub: ReactiveX/RxJava: RxJava ; 通过切换分支来查看不同版本。
RxJava 1.x:
扔物线 给 Android 开发者的 RxJava 详解,这里是某学姐对此文章的理解 RxJava学习总结 - 某学姐
Rxjava 从入门到开发,让 Rxjava 学习更加简单 - Android - 掘金
《ReactiveX/RxJava文档中文版》Gitbook
一个项目:Hello-RxJava: 可能是学习Rxjava最好的教程之一
RxJava 2.x :
回头吧去学 RxJava 2.x
RxJava 2.x 是按照Reactive-Streams specification规范完全的重写的,并且完全独立于RxJava 1.x 而存在。
RxJava介绍
ReactiveX是Reactive Extensions的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发。 Rx库现在已经支持几乎全部的流行编程语言。
微软给的定义是,Rx是一个函数库,让开发者可以利用可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序,使用Rx,开发者可以用Observables表示异步数据流,用LINQ操作符查询异步数据流, 用Schedulers参数化异步数据流的并发处理,Rx可以这样定义:
Rx = Observables + LINQ + Schedulers
ReactiveX.io给的定义是,Rx是一个使用可观察数据流进行异步编程的编程接口,ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。
Rx扩展了观察者模式用于支持数据和事件序列,添加了一些操作符,它让你可以声明式的组合这些序列,而无需关注底层的实现:如线程、同步、线程安全、并发数据结构和非阻塞IO。
使用观察者模式:
- 创建:Rx可以方便的创建事件流和数据流
- 组合:Rx使用查询式的操作符组合和变换数据流
- 监听:Rx可以订阅任何可观察的数据流并执行操作
好吧看到这里,我只能说 Rx 真的很强大。
RxJava 的几个基本概念
- Observable (可观察对象)
- Observer (观察者)
- subscribe (订阅)
- 事件
程序的观察者模式和这种真正的『观察』略有不同,观察者不需要时刻盯着可观察对象(例如 A 不需要每过 2ms 就检查一次 B 的状态),而是采用注册(Register)或者称为订阅(Subscribe)的方式,告诉可观察对象:我需要你的某某状态,你要在它变化的时候通知我。
采取这样被动的观察方式,既省去了反复检索状态的资源消耗,也能够得到最高的反馈速度。
RxJava的基本实现
一个观察者(Observer)订阅一个可观察对象(Observable)。之后可观察对象(Observable)通过调用观察者的方法(比如下文中的onNext()
方法),以此来发射数据或通知给它的观察者。
RxJava 的基本实现主要有三点:
1.创建 Observer(观察者)或Subscriber (订阅者)
它决定事件触发的时候将有怎样的行为。Observer 接口:
//观察者
Observer<String> observer = new Observer<String>() {
//Observable(可观察对象)通过调用此方法来发射数据,方法的参数就是发射的数据;
//该方法可以被调用多次。
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
//Observable(可观察对象)通过调用此方法来发送通知,表示正常终止
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
//Observable(可观察对象)通过调用此方法来发送通知,表示自己遇到错误
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
除了 Observer 接口之外,RxJava 还内置了一个特殊的观察者接口:Subscriber (订阅者)。
Subscriber (订阅者),它扩展了Observer:
-
onStart()
方法:在事件还未发送之前被调用,可以做一些准备工作。 -
unsubscribe()
: 这是 Subscriber 所实现的另一个接口 Subscription(订阅)接口 的方法,用于取消订阅。最好在适当的生命周期函数中调用此方法,以避免内存泄漏。
取消订阅的结果会传递给这个Observable的操作符链,而且会导致这个链条上的每个环节都停止发射数据项。这些并不保证会立即发生。
补充:关于RxJava1中的Subscription的一些误解 - 简书
2. 创建 Observable(可观察对象)
Observable 即可观察对象,它决定什么时候触发事件以及触发怎样的事件。
可调用 Observable 的 create()
方法返回一个 Observable对象:
//注意参数 Onsubscribe 对象
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
// 调用 subscriber 的 相关方法
// 感觉调用 onNext 方法就是指 发射数据
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
OnSubscribe:一个接口类,是连接可观察对象和观察者的桥梁,另外要说明的是onSubscribe是Observable的一个局部变量
TODO:还需补充该接口具体描述。
创建操作符: Create(),该方法也是一个操作符。
通过调用哪个方法发射数据?
Observable什么时候开始发射数据序列?
这取决于Observable的实现:
- 有一种实现是在Observable一创建完就开始发射数据,这可能导致观察者无法接收在Observable调用subscribe()方法之前发射的数据。
- 还有一些Observable在创建后会处于等待状态,直到有观察者订阅它才开始发射数据。
- 还有其它的一些 Observable 实现。
3.Subscribe (v.订阅)
订阅方式一:
Observable(可观察对象) 通过subscribe()
方法和Subscriber(订阅者)实现了订阅关系(两者建立了联系)。
Observable.subscribe(Observer observer)
下面是将 Observable(可观察对象)的create()
和subscribe()
写在同一条语句中的示例:
Observable.create(new Observable.OnSubscribe(){
@Override
public void call(Subscriber subscriber) {
for(int i=0;i<3;i++){
subscriber.onnext(i);
}
subscriber.oncompleted();
})
//开始订阅
.subscribe(new="" subscriber() {
@Override
public void onCompleted() {
Log.i(Log.TAG,"hello rxjava execute complete");
}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(Integer integer) {
Log.i(Log.TAG,""+integer);
}
}
);
Observable.subscribe(Subscriber subscriber)
的主要逻辑:
//注意比较函数的返回值
public Subscription subscribe(Subscriber subscriber) {
//...
// 前面介绍过的 onStart方法
subscriber.onStart();
//这表明在被订阅的时候开始事件发送的逻辑
onSubscribe.call(subscriber);
// 返回 subscriber,便于日后 unsubscribe()
return subscriber;
}
订阅方式二:
Observable.subscribe(Action*, ...)
Action
这种方式,里面实现也还是用Subscriber进行了包装,本质上就是上面Subscriber的那种方式。只不过根据传入的参数不同回调的方法不同而已。
实际过程是这样:
1.先创建一个或多个 Action* 的对象
// 先创建 Action* 的对象,这里创建了三个 Action* 对象
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};
2.再将它们传递给重载的observable.subscribe()
方法,这些重载的方法看起来像这样:
Subscription subscribe(final Action1<? super T> onNext)
Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError)
Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onComplete)
传递的过程像这样:
// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
3.RxJava之后会自动根据重载的subscribe(action*,...)
方法参数的个数创建出对应的 subscriber。
逻辑看起来像这样:
public * subscribe(Action action, ...) {
// 1.
//先将 Action 转换为 subscriber
// 2.
//再进行与 subscribe(Subscriber subscriber) 相似的逻辑
//或者直接再调用 subscribe(Subscriber subscriber) 方法
return *;
}
RxJava
提供了Action0- Action9
和ActionN
这几个接口;Action只有一个call(...)
方法; 对于Action3,这里的数字3表示它的call()
方法的参数的个数为3个。可以将Action看成包装对象,他的包装对象是一个没有返回值的方法。比如现在要将subscriber的 onError(Throwable throwable)
方法包装成一个 action对象,那么可以像下面这样(注意: 前提是该方法没有返回值):
//由于 onError(Throwable throwable) 是带有1个参数的所以我们需要用 Action1 来对其进行包装
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError(Throwable throwable)
@Override
public void call(Throwable throwable) {
// Error handling
// onError(Throwable throwable) 方法需要包含的逻辑在这里实现
}
};
关于Subscription
这个接口,这个类提供了两个方法unsubscribe()
和isUnsubscribed()
,可以解除订阅关系和判断订阅关系。subscribe()
订阅方法的返回值也是Subscription
。
还记得前面说过 Subscriber(订阅者)的unsubscribe()
方法 是 Subscriber 所实现的另一个接口 Subscription(订阅)接口 的方法。
subscribe:订阅(动词) 、Subscription:订阅(名词) 、 Subscriber:订阅者。其中Subscriber的一个实现:
public final class SerializedSubscriber<T> implements FlowableSubscriber<T>, Subscription{}
RxJava操作符
Rx提供了一系列的操作符,你可以使用它们来过滤(filter)、选择(select)、变换(transform)、结 合(combine)和组合(compose)多个Observable,这些操作符让执行和复合变得非常高效。
对于RxJava的使用,最重要的还是对于操作符的学习,熟悉了操作符才能更好的使用RxJava。
简单来说:操作符就是Observable的各种操作,例如:创建,变换,过滤操作等等。在这里需要强调下的是,Observable通过操作符的操作之后会得到一个新的Observable,每创建一个操作符,简单来说就是创建了一个子任务。
操作符让你可以变换、组合、操纵和处理Observable发射的数据。
操作符可以分为多种类型:
- 创建操作:用于创建Observable的操作符
- 变换操作:这些操作符可用于对Observable发射的数据进行变换
- 过滤操作:这些操作符用于从Observable发射的数据中进行选择
- 组合操作:组合操作符用于将多个Observable组合成一个单一的Observable
- 错误处理:这些操作符用于从错误通知中恢复
- 辅助操作:一组用于处理Observable的操作符
- 条件和布尔操作:这些操作符可用于单个或多个数据项,也可用于Observable
- 算术和聚合操作:这些操作符可用于整个数据序列
- 连接操作:一些有精确可控的订阅行为的特殊Observable
- 转换操作
几种主要的需求:
- 直接创建一个Observable(创建操作)
- 组合多个Observable(组合操作)
- 对Observable发射的数据执行变换操作(变换操作)
- 从Observable发射的数据中取特定的值(过滤操作)
- 转发Observable的部分值(条件/布尔/过滤操作)
- 对Observable发射的数据序列求值(算术/聚合操作)
创建操作
create
通过调用观察者的方法从头创建一个Observable
just
将对象或者对象集合转换为一个能够发射这些对象的Observable
Just类似于From,但是From会将数组或Iterable的数据取出然后逐个发射,而Just只是简单的原样发射,将数组或Iterable当做单个数据。
RxJava将这个操作符实现为just
函数,它接受一至九个参数,返回一个按参数列表顺序发射这些数据的Observable。
//不要与 from 混淆,这里是传递 3 个参数给 just()
Observable.just(1, 2, 3)
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
输出:
Next: 1
Next: 2
Next: 3
Sequence complete.
from
将其它的对象或数据结构转换为Observable
在RxJava中, from 操作符可以转换Future、Iterable和数组。对于Iterable和数组,产生的 Observable会发射Iterable或数组的每一项数据。
Integer[] items = { 0, 1, 2, 3, 4, 5 };
//数组中的内容将被一个一个发送出去
Observable myObservable = Observable.from(items);
myObservable.subscribe(
new Action1<Integer>() {
@Override
public void call(Integer item) {
//接收并打印 Observable 发射的每一项数据
System.out.println(item);
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable error) {
System.out.println("Error encountered: " + error.getMessage());
}
},
new Action0() {
@Override
public void call() {
System.out.println("Sequence complete");
}
}
);
输出结果:
0
1
2
3
4
5
Sequence complete
变换操作
在RxJava的内部实现中,变换操作都是基于同一个基础的变换方法
lift(Operator)
。
map
map为一对一变换。一个对象 -> 另一个对象
或者 一个数组 -> 另一个数组
。
Map操作符对原始Observable发射的每一项数据应用一个你选择的函数,执行变换操作,然后返回一个发射这些结果的Observable。
依次取出传入的数组或迭代器的每一个item,然后将其作为参数传入相关函数,该函数更改了它的数据类型,并返回一个能够发射所有这些新数据的 Observable。
Observable.just("images/logo.png") // 输入类型 String
// map映射: 通过Func1接口中的call方法将 String 值映射为 Bitmap 类型的值
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filePath) { // 参数类型 String
return getBitmapFromPath(filePath); // 返回类型 Bitmap
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) { // 参数类型 Bitmap
showBitmap(bitmap);
}
});
这里 Func1
接口与 Action1
接口相似,两者的区别在于 Func1
接口包装的是有返回值的方法。Func*
也有多个用于不同参数个数的方法。
flatmap
FlatMap 扁平映射,将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,可以认为是一个将嵌套的数据结构展开的过程。
FlatMap
操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后FlatMap
合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射。
这个方法是很有用的,例如,当你有一个这样的Observable:它发射一个数据序列,这些数据本身包含Observable成员或者可以变换为Observable,因此你可以创建一个新的Observable
发射这些次级Observable
发射的数据的完整集合。
注意:FlatMap
对这些Observables发射的数据做的是合并(merge
)操作,因此它们可能是交错的。(这句话说的不明不白,这里的merge类似与 git中的merge吗?)如果需要按顺序连接可以使用 concatMap 操作符。
注意:如果任何一个通过这个flatMap
操作产生的单独的Observable
(新Observable)调用onError
异常终止了,这个Observable(旧Observable)自身会立即调用onError
并终止。
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(students)
//把 Student 转换为 Observable
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);
flatMap()
和 map()
有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和 map()
不同的是, flatMap()
中返回的是个 Observable
对象,并且这个 Observable
对象并不是被直接发送到了 Subscriber
的回调方法中。 flatMap()
的原理是这样的:1. 使用传入的事件对象创建一个 Observable
对象;2. 并不发送这个 Observable
, 而是将它激活,于是它开始发送事件;3. 每一个创建出来的 Observable
发送的事件,都被汇入同一个 Observable
,而这个 Observable
负责将这些事件统一交给 Subscriber
的回调方法。
示例:
以Person为例,一个Person对应一个身份证id,一个Person可以有多个Email。通过map()
可以将Person转换成id,从而得到一个Person的身份证号码;通过flatMap()
可以将 Person转换成一组Email,从而得到一个Person的所有Email。
/**
* map: Person -> id(String)
* 打印某个人id
*/
private void testMap0() {
Observable.just(getPersonArray()[0])
.map(new Func1<Person, String>() {
@Override
public String call(Person person) {
return person.id;
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String id) {
Log.d(TAG, "id -> " + id);
}
});
}
/**
* map: array Person -> id(String)
* 打印每个人的id
*/
private void testMap() {
Observable.from(getPersonArray())
.map(new Func1<Person, String>() {
@Override
public String call(Person person) {
return person.id;
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String id) {
Log.d(TAG, "id -> " + id);
}
});
}
/**
* flatMap: array Person -> email数组(String[])
* 打印每个人的所有email
*/
private void testFlatMap() {
Observable.from(getPersonArray())
.flatMap(new Func1<Person, Observable<Person.Email>>() {
@Override
public Observable<Person.Email> call(Person person) {
Log.d(TAG, "flatMap " + person.id);
return Observable.from(person.emails);
}
})
.subscribe(new Subscriber<Person.Email>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError " + e.getMessage());
}
@Override
public void onNext(Person.Email email) {
Log.d(TAG, "onNext " + email.name);
}
});
}
补充:背压
背压,大概就是指在异步场景中,被观察者发送事件的速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。在差距太大的时候,我们的内存会猛增,直到OOM。
参考:
使用调度器 Scheduler 进行线程控制
如果你想给Observable操作符链添加多线程功能,你可以指定操作符(或者特定的 Observable)在特定的调度器(Scheduler)上执行。
Observable(可观察对象)提供了两个方法来设置调度器:
- subscribeOn():指定
subscribe()
所发生的线程,即Observable.OnSubscribe
被激活时所处的线程。或者叫做事件产生的线程。 - observeOn(): 指定
Subscriber
所在的线程。或者叫做事件消费的线程。(这样更好理解)
另一种解释(恕我愚昧,这两种解释到底有没有区别?):
- subscribeOn(): 它指示Observable将全部的处理过程(包括发射数据和通知)放在特定的调度器上执行。事件产生的线程。
- observeOn(): 指定 Observable 在一个特定的调度器上调用 Observe(观察者)的相关方法。由上文可知是在
Observable.OnSubscribe<T>
的call()
方法中调用Observe(观察者)的相关方法,那么这里说明的就是该call()
方法中的onNext
、onError
和onCompleted
执行时所在的调度器(那么调用call()
方法的这个语句是在哪个线程?并且看起来是只有call方法中的上述三条语句在observeOn()指定的线程上运行)。事件消费的线程。
根据打印出的Log来看:
- subscribeOn():指定
OnSubscribe.call()
的执行线程,即Observable通知Subscriber的线程; - observeOn():指定Subscriber回调(
onNext
、onError
和onCompleted
)的执行线程。
或者直接这样更好理解:
- subscribeOn(): 它指示Observable对象执行时所在的线程
- observeOn(): 指定 Observe或Subscriber 对象执行时所在的线程。
RxJava包含了多种调度器,常见的几种:
- Schedulers.computation( ):用于计算任务。指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作
- Schedulers.io( ):用于IO密集型任务;可以进行文件、数据库和网络操作
- Schedulers.newThread( ):为每个任务创建一个新线程
- Schedulers.immediate( ):在当前线程立即开始执行任务
- 对于Android 还有一个专用的
AndroidSchedulers.mainThread()
,它指定的操作将在 Android 主线程运行
示例:
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.d(TAG, "OnSubscribe.call Thread -> " + Thread.currentThread().getName());
subscriber.onNext("message");
}
})
// 设置 observable 对象在 io 线程运行
.subscribeOn(Schedulers.io())
// 设置 observe 对象在 Android 主线程运行
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.d(TAG, "Subscriber.onNext Thread -> " + Thread.currentThread().getName());
}
});
多次切换线程
各操作符都会被指定一个默认的调度器,具体见 《ReactiveX/RxJava文档中文版》
observeOn()
指定的是它之后的操作符所在的线程。操作符可以有多个,那么observeOn()
也可以调用多次。
不过,不同于 observeOn()
, subscribeOn()
的位置放在哪里都可以,但它是只能调用一次的。
当使用了多个 subscribeOn()
的时候,只有第一个 subscribeOn()
起作用。
另可见: 一个项目:Hello-RxJava: 可能是学习Rxjava最好的教程之一 ,这里还使用图片对调度操作进行了展示。
doOnSubscribe()
然而,虽然超过一个的 subscribeOn()
对事件处理的流程没有影响,但在流程之前却是可以利用的。
在前面讲 Subscriber
的时候,提到过 Subscriber
的 onStart()
可以用作流程开始前的初始化。然而 onStart()
由于在 subscribe()
发生时就被调用了,因此不能指定线程,而是只能执行在 subscribe()
被调用时的线程。这就导致如果 onStart()
中含有对线程有要求的代码(例如在界面上显示一个 ProgressBar,这必须在主线程执行),将会有线程非法的风险,因为有时你无法预测 subscribe()
将会在什么线程执行。
而与 Subscriber.onStart()
相对应的,有一个方法 Observable.doOnSubscribe()
。它和 Subscriber.onStart()
同样是在 subscribe()
调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe()
执行在 subscribe()
发生的线程;而如果在 doOnSubscribe()
之后有 subscribeOn()
的话,它将执行在离它最近的 subscribeOn()
所指定的线程。
Observable.create(onSubscribe)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
}
})
.subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
如上,在 doOnSubscribe()
的后面跟一个 subscribeOn()
,就能指定准备工作的线程了。
RxJava使用场景
- RxJava与Retrofit的结合
- RxBinding
- RxBus
RxJava 2.x
RxJava 2.x 是按照Reactive-Streams specification规范完全的重写的,完全独立于RxJava 1.x 而存在。