RxJava 1.x 入门

Published: by Creative Commons Licence (Last updated: )

RxJava 1.x 入门

这篇文章基本上就是在抄袭 扔物线 的 给 Android 开发者的 RxJava 详解 一文,外加了少许笔记。

响应式编程模式(reactive pattern)

RxJava GitHub: ReactiveX/RxJava: RxJava ; 通过切换分支来查看不同版本。

RxJava 1.x:

扔物线 给 Android 开发者的 RxJava 详解,这里是某学姐对此文章的理解 RxJava学习总结 - 某学姐

Rxjava 从入门到开发,让 Rxjava 学习更加简单 - Android - 掘金

《ReactiveX/RxJava文档中文版》Gitbook

一个项目:Hello-RxJava: 可能是学习Rxjava最好的教程之一

RxJava 2.x :

一个项目:RxJava2-Android-Samples

回头吧去学 RxJava 2.x

RxJava 2.x 是按照Reactive-Streams specification规范完全的重写的,并且完全独立于RxJava 1.x 而存在。

RxJava介绍

ReactiveX是Reactive Extensions的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发。 Rx库现在已经支持几乎全部的流行编程语言。

微软给的定义是,Rx是一个函数库,让开发者可以利用可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序,使用Rx,开发者可以用Observables表示异步数据流,用LINQ操作符查询异步数据流, 用Schedulers参数化异步数据流的并发处理,Rx可以这样定义:

Rx = Observables + LINQ	+ Schedulers

ReactiveX.io给的定义是,Rx是一个使用可观察数据流进行异步编程的编程接口,ReactiveX结合了观察者模式迭代器模式函数式编程的精华。

Rx扩展了观察者模式用于支持数据和事件序列,添加了一些操作符,它让你可以声明式的组合这些序列,而无需关注底层的实现:如线程、同步、线程安全、并发数据结构和非阻塞IO。

使用观察者模式:

  • 创建:Rx可以方便的创建事件流和数据流
  • 组合:Rx使用查询式的操作符组合和变换数据流
  • 监听:Rx可以订阅任何可观察的数据流并执行操作

好吧看到这里,我只能说 Rx 真的很强大。

RxJava 的几个基本概念

  • Observable (可观察对象)
  • Observer (观察者)
  • subscribe (订阅)
  • 事件

程序的观察者模式和这种真正的『观察』略有不同,观察者不需要时刻盯着可观察对象(例如 A 不需要每过 2ms 就检查一次 B 的状态),而是采用注册(Register)或者称为订阅(Subscribe)的方式,告诉可观察对象:我需要你的某某状态,你要在它变化的时候通知我。

采取这样被动的观察方式,既省去了反复检索状态的资源消耗,也能够得到最高的反馈速度。

RxJava的基本实现

一个观察者(Observer)订阅一个可观察对象(Observable)。之后可观察对象(Observable)通过调用观察者的方法(比如下文中的onNext()方法),以此来发射数据或通知给它的观察者。

RxJava 的基本实现主要有三点:

1.创建 Observer(观察者)或Subscriber (订阅者)

它决定事件触发的时候将有怎样的行为。Observer 接口:

//观察者
Observer<String> observer = new Observer<String>() {
    //Observable(可观察对象)通过调用此方法来发射数据,方法的参数就是发射的数据;
  	//该方法可以被调用多次。
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    //Observable(可观察对象)通过调用此方法来发送通知,表示正常终止
    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    //Observable(可观察对象)通过调用此方法来发送通知,表示自己遇到错误
    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

除了 Observer 接口之外,RxJava 还内置了一个特殊的观察者接口:Subscriber (订阅者)

Subscriber (订阅者),它扩展了Observer:

  • onStart()方法:在事件还未发送之前被调用,可以做一些准备工作。

  • unsubscribe(): 这是 Subscriber 所实现的另一个接口 Subscription(订阅)接口 的方法,用于取消订阅。最好在适当的生命周期函数中调用此方法,以避免内存泄漏。

取消订阅的结果会传递给这个Observable的操作符链,而且会导致这个链条上的每个环节都停止发射数据项。这些并不保证会立即发生。

补充:关于RxJava1中的Subscription的一些误解 - 简书

2. 创建 Observable(可观察对象)

Observable 即可观察对象,它决定什么时候触发事件以及触发怎样的事件。

可调用 Observable 的 create()方法返回一个 Observable对象:

//注意参数 Onsubscribe 对象
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        // 调用 subscriber 的 相关方法
      	// 感觉调用 onNext 方法就是指 发射数据 
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }
});

OnSubscribe:一个接口类,是连接可观察对象和观察者的桥梁,另外要说明的是onSubscribe是Observable的一个局部变量

TODO:还需补充该接口具体描述。

创建操作符: Create(),该方法也是一个操作符。

通过调用哪个方法发射数据?

Observable什么时候开始发射数据序列?
这取决于Observable的实现:

  • 有一种实现是在Observable一创建完就开始发射数据,这可能导致观察者无法接收在Observable调用subscribe()方法之前发射的数据。
  • 还有一些Observable在创建后会处于等待状态,直到有观察者订阅它才开始发射数据。
  • 还有其它的一些 Observable 实现。

3.Subscribe (v.订阅)

订阅方式一:

Observable(可观察对象) 通过subscribe()方法和Subscriber(订阅者)实现了订阅关系(两者建立了联系)。

Observable.subscribe(Observer observer)

下面是将 Observable(可观察对象)的create()subscribe()写在同一条语句中的示例:

Observable.create(new Observable.OnSubscribe(){

    @Override
    public void call(Subscriber subscriber) {
        for(int i=0;i<3;i++){ 
                subscriber.onnext(i);
            }
            subscriber.oncompleted();
    })
    //开始订阅
    .subscribe(new="" subscriber() {
        @Override
        public void onCompleted() {
            Log.i(Log.TAG,"hello rxjava execute complete");
        }

        @Override
        public void onError(Throwable e) {}

        @Override
        public void onNext(Integer integer) {
            Log.i(Log.TAG,""+integer);
        }
    }
    );

Observable.subscribe(Subscriber subscriber)的主要逻辑:

//注意比较函数的返回值
public Subscription subscribe(Subscriber subscriber) {
    //...
   // 前面介绍过的 onStart方法
    subscriber.onStart();
    //这表明在被订阅的时候开始事件发送的逻辑
    onSubscribe.call(subscriber);
    // 返回 subscriber,便于日后 unsubscribe()
    return subscriber;
}

订阅方式二:

Observable.subscribe(Action*, ...)

Action这种方式,里面实现也还是用Subscriber进行了包装,本质上就是上面Subscriber的那种方式。只不过根据传入的参数不同回调的方法不同而已。

实际过程是这样:

1.先创建一个或多个 Action* 的对象

// 先创建 Action* 的对象,这里创建了三个 Action* 对象

Action1<String> onNextAction = new Action1<String>() {
    // onNext()
    @Override
    public void call(String s) {
        Log.d(tag, s);
    }
};

Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    // onError()
    @Override
    public void call(Throwable throwable) {
        // Error handling
    }
};

Action0 onCompletedAction = new Action0() {
    // onCompleted()
    @Override
    public void call() {
        Log.d(tag, "completed");
    }
};

2.再将它们传递给重载的observable.subscribe()方法,这些重载的方法看起来像这样:

Subscription subscribe(final Action1<? super T> onNext)
Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError)
Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onComplete)

传递的过程像这样:

// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

3.RxJava之后会自动根据重载的subscribe(action*,...)方法参数的个数创建出对应的 subscriber。

逻辑看起来像这样:

public * subscribe(Action action, ...) {
  	// 1. 
	//先将 Action 转换为 subscriber
  	
  	// 2.
  	//再进行与 subscribe(Subscriber subscriber) 相似的逻辑
  	//或者直接再调用 subscribe(Subscriber subscriber) 方法
  
    return *;
}

RxJava提供了Action0- Action9ActionN这几个接口;Action只有一个call(...)方法; 对于Action3,这里的数字3表示它的call()方法的参数的个数为3个。可以将Action看成包装对象,他的包装对象是一个没有返回值的方法。比如现在要将subscriber的 onError(Throwable throwable)方法包装成一个 action对象,那么可以像下面这样(注意: 前提是该方法没有返回值):

//由于 onError(Throwable throwable) 是带有1个参数的所以我们需要用 Action1 来对其进行包装
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    // onError(Throwable throwable)
    @Override
    public void call(Throwable throwable) {
        // Error handling
      // onError(Throwable throwable) 方法需要包含的逻辑在这里实现
    }
};

关于Subscription这个接口,这个类提供了两个方法unsubscribe()isUnsubscribed(),可以解除订阅关系和判断订阅关系。subscribe()订阅方法的返回值也是Subscription

还记得前面说过 Subscriber(订阅者)的unsubscribe()方法 是 Subscriber 所实现的另一个接口 Subscription(订阅)接口 的方法。

subscribe:订阅(动词) 、Subscription:订阅(名词) 、 Subscriber:订阅者。其中Subscriber的一个实现:

public final class SerializedSubscriber<T> implements FlowableSubscriber<T>, Subscription{}

RxJava操作符

Rx提供了一系列的操作符,你可以使用它们来过滤(filter)、选择(select)、变换(transform)、结 合(combine)和组合(compose)多个Observable,这些操作符让执行和复合变得非常高效。

对于RxJava的使用,最重要的还是对于操作符的学习,熟悉了操作符才能更好的使用RxJava。

简单来说:操作符就是Observable的各种操作,例如:创建,变换,过滤操作等等。在这里需要强调下的是,Observable通过操作符的操作之后会得到一个新的Observable,每创建一个操作符,简单来说就是创建了一个子任务。

操作符让你可以变换、组合、操纵和处理Observable发射的数据。

操作符可以分为多种类型:

  • 创建操作:用于创建Observable的操作符
  • 变换操作:这些操作符可用于对Observable发射的数据进行变换
  • 过滤操作:这些操作符用于从Observable发射的数据中进行选择
  • 组合操作:组合操作符用于将多个Observable组合成一个单一的Observable
  • 错误处理:这些操作符用于从错误通知中恢复
  • 辅助操作:一组用于处理Observable的操作符
  • 条件和布尔操作:这些操作符可用于单个或多个数据项,也可用于Observable
  • 算术和聚合操作:这些操作符可用于整个数据序列
  • 连接操作:一些有精确可控的订阅行为的特殊Observable
  • 转换操作

几种主要的需求:

  • 直接创建一个Observable(创建操作)
  • 组合多个Observable(组合操作)
  • 对Observable发射的数据执行变换操作(变换操作)
  • 从Observable发射的数据中取特定的值(过滤操作)
  • 转发Observable的部分值(条件/布尔/过滤操作)
  • 对Observable发射的数据序列求值(算术/聚合操作)

创建操作

create

通过调用观察者的方法从头创建一个Observable

just

将对象或者对象集合转换为一个能够发射这些对象的Observable

Just类似于From,但是From会将数组或Iterable的数据取出然后逐个发射,而Just只是简单的原样发射,将数组或Iterable当做单个数据。

RxJava将这个操作符实现为just函数,它接受一至九个参数,返回一个按参数列表顺序发射这些数据的Observable。

//不要与 from 混淆,这里是传递 3 个参数给 just()
Observable.just(1, 2, 3)
          .subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });

输出:

Next: 1
Next: 2
Next: 3
Sequence complete.

from

将其它的对象或数据结构转换为Observable

在RxJava中, from 操作符可以转换Future、Iterable和数组。对于Iterable和数组,产生的 Observable会发射Iterable或数组的每一项数据。

Integer[] items = { 0, 1, 2, 3, 4, 5 };
//数组中的内容将被一个一个发送出去
Observable myObservable = Observable.from(items);

myObservable.subscribe(
    new Action1<Integer>() {
        @Override
        public void call(Integer item) {
          	//接收并打印 Observable 发射的每一项数据
            System.out.println(item);
        }
    },
    new Action1<Throwable>() {
        @Override
        public void call(Throwable error) {
            System.out.println("Error encountered: " + error.getMessage());
        }
    },
    new Action0() {
        @Override
        public void call() {
            System.out.println("Sequence complete");
        }
    }
);

输出结果:

0
1
2
3
4
5
Sequence complete

变换操作

在RxJava的内部实现中,变换操作都是基于同一个基础的变换方法 lift(Operator)

map

map为一对一变换。一个对象 -> 另一个对象 或者 一个数组 -> 另一个数组

Map操作符对原始Observable发射的每一项数据应用一个你选择的函数,执行变换操作,然后返回一个发射这些结果的Observable。

依次取出传入的数组或迭代器的每一个item,然后将其作为参数传入相关函数,该函数更改了它的数据类型,并返回一个能够发射所有这些新数据的 Observable。

Observable.just("images/logo.png") // 输入类型 String
  	// map映射: 通过Func1接口中的call方法将 String 值映射为 Bitmap 类型的值
    .map(new Func1<String, Bitmap>() {
        @Override
        public Bitmap call(String filePath) { // 参数类型 String
            return getBitmapFromPath(filePath); // 返回类型 Bitmap
        }
    })
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) { // 参数类型 Bitmap
            showBitmap(bitmap);
        }
    });

这里 Func1 接口与 Action1 接口相似,两者的区别在于 Func1接口包装的是有返回值的方法。Func*也有多个用于不同参数个数的方法。

flatmap

FlatMap 扁平映射,将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,可以认为是一个将嵌套的数据结构展开的过程。

FlatMap操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后FlatMap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射。

这个方法是很有用的,例如,当你有一个这样的Observable:它发射一个数据序列,这些数据本身包含Observable成员或者可以变换为Observable,因此你可以创建一个新的Observable发射这些次级Observable发射的数据的完整集合

注意:FlatMap对这些Observables发射的数据做的是合并(merge)操作,因此它们可能是交错的。(这句话说的不明不白,这里的merge类似与 git中的merge吗?)如果需要按顺序连接可以使用 concatMap 操作符。

注意:如果任何一个通过这个flatMap操作产生的单独的Observable(新Observable)调用onError异常终止了,这个Observable(旧Observable)自身会立即调用onError并终止。

Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
    @Override
    public void onNext(Course course) {
        Log.d(tag, course.getName());
    }
    ...
};

Observable.from(students)
  	//把 Student 转换为 Observable
    .flatMap(new Func1<Student, Observable<Course>>() {
        @Override
        public Observable<Course> call(Student student) {
            return Observable.from(student.getCourses());
        }
    })
    .subscribe(subscriber);

flatMap()map() 有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和 map()不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中。 flatMap() 的原理是这样的:1. 使用传入的事件对象创建一个 Observable 对象;2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。

示例:

以Person为例,一个Person对应一个身份证id,一个Person可以有多个Email。通过map()可以将Person转换成id,从而得到一个Person的身份证号码;通过flatMap()可以将 Person转换成一组Email,从而得到一个Person的所有Email。

/**
 * map: Person -> id(String)
 * 打印某个人id
 */
private void testMap0() {
    Observable.just(getPersonArray()[0])
            .map(new Func1<Person, String>() {
                @Override
                public String call(Person person) {
                    return person.id;
                }
            })
            .subscribe(new Subscriber<String>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(String id) {
                    Log.d(TAG, "id -> " + id);
                }
            });
}

/**
 * map: array Person -> id(String)
 * 打印每个人的id
 */
private void testMap() {
    Observable.from(getPersonArray())
            .map(new Func1<Person, String>() {
                @Override
                public String call(Person person) {
                    return person.id;
                }
            })
            .subscribe(new Subscriber<String>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(String id) {
                    Log.d(TAG, "id -> " + id);
                }
            });
}

/**
 * flatMap: array Person -> email数组(String[])
 * 打印每个人的所有email
 */
private void testFlatMap() {
    Observable.from(getPersonArray())
            .flatMap(new Func1<Person, Observable<Person.Email>>() {
                @Override
                public Observable<Person.Email> call(Person person) {
                    Log.d(TAG, "flatMap " + person.id);
                    return Observable.from(person.emails);
                }
            })
            .subscribe(new Subscriber<Person.Email>() {
                @Override
                public void onCompleted() {
                    Log.d(TAG, "onCompleted");
                }

                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "onError " + e.getMessage());
                }

                @Override
                public void onNext(Person.Email email) {
                    Log.d(TAG, "onNext " + email.name);
                }
            });
}

补充:背压

背压,大概就是指在异步场景中,被观察者发送事件的速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。在差距太大的时候,我们的内存会猛增,直到OOM。

参考:

  1. Android - RxJava学习笔记 - cashow的博客
  2. 各种操作符的学习: Operators · ReactiveX文档中文翻译

使用调度器 Scheduler 进行线程控制

如果你想给Observable操作符链添加多线程功能,你可以指定操作符(或者特定的 Observable)在特定的调度器(Scheduler)上执行。

Observable(可观察对象)提供了两个方法来设置调度器:

  • subscribeOn():指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。
  • observeOn(): 指定 Subscriber 所在的线程。或者叫做事件消费的线程。(这样更好理解)

另一种解释(恕我愚昧,这两种解释到底有没有区别?):

  • subscribeOn(): 它指示Observable将全部的处理过程(包括发射数据和通知)放在特定的调度器上执行。事件产生的线程。
  • observeOn(): 指定 Observable 在一个特定的调度器上调用 Observe(观察者)的相关方法。由上文可知是在 Observable.OnSubscribe<T>call()方法中调用Observe(观察者)的相关方法,那么这里说明的就是该 call() 方法中的onNextonErroronCompleted执行时所在的调度器(那么调用call()方法的这个语句是在哪个线程?并且看起来是只有call方法中的上述三条语句在observeOn()指定的线程上运行)。事件消费的线程。

根据打印出的Log来看:

  • subscribeOn():指定OnSubscribe.call()的执行线程,即Observable通知Subscriber的线程;
  • observeOn():指定Subscriber回调(onNextonErroronCompleted)的执行线程。

或者直接这样更好理解:

  • subscribeOn(): 它指示Observable对象执行时所在的线程
  • observeOn(): 指定 Observe或Subscriber 对象执行时所在的线程。

RxJava包含了多种调度器,常见的几种:

  • Schedulers.computation( ):用于计算任务。指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作
  • Schedulers.io( ):用于IO密集型任务;可以进行文件、数据库和网络操作
  • Schedulers.newThread( ):为每个任务创建一个新线程
  • Schedulers.immediate( ):在当前线程立即开始执行任务
  • 对于Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行

示例:

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        Log.d(TAG, "OnSubscribe.call Thread -> " + Thread.currentThread().getName());
        subscriber.onNext("message");
    }
})
  // 设置 observable 对象在 io 线程运行
  .subscribeOn(Schedulers.io())
  // 设置 observe 对象在 Android 主线程运行
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Subscriber<String>() {
      @Override
      public void onCompleted() {

      }

      @Override
      public void onError(Throwable e) {

      }

      @Override
      public void onNext(String s) {
          Log.d(TAG, "Subscriber.onNext Thread -> " + Thread.currentThread().getName());
      }
  });

多次切换线程

各操作符都会被指定一个默认的调度器,具体见 《ReactiveX/RxJava文档中文版》

observeOn() 指定的是它之后的操作符所在的线程。操作符可以有多个,那么observeOn() 也可以调用多次。

不过,不同于 observeOn()subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的。

当使用了多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用。

另可见: 一个项目:Hello-RxJava: 可能是学习Rxjava最好的教程之一 ,这里还使用图片对调度操作进行了展示。

doOnSubscribe()

然而,虽然超过一个的 subscribeOn() 对事件处理的流程没有影响,但在流程之前却是可以利用的。

在前面讲 Subscriber 的时候,提到过 SubscriberonStart() 可以用作流程开始前的初始化。然而 onStart() 由于在 subscribe()发生时就被调用了,因此不能指定线程,而是只能执行在 subscribe() 被调用时的线程。这就导致如果 onStart() 中含有对线程有要求的代码(例如在界面上显示一个 ProgressBar,这必须在主线程执行),将会有线程非法的风险,因为有时你无法预测 subscribe() 将会在什么线程执行。

而与 Subscriber.onStart() 相对应的,有一个方法 Observable.doOnSubscribe() 。它和 Subscriber.onStart() 同样是在 subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。

Observable.create(onSubscribe)
    .subscribeOn(Schedulers.io())
    .doOnSubscribe(new Action0() {
        @Override
        public void call() {
            progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
        }
    })
    .subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);

如上,在 doOnSubscribe()后面跟一个 subscribeOn() ,就能指定准备工作的线程了。

RxJava使用场景

是时候学习RxJava了 - 简书

  • RxJava与Retrofit的结合
  • RxBinding
  • RxBus

RxJava 2.x

RxJava 2.x 是按照Reactive-Streams specification规范完全的重写的,完全独立于RxJava 1.x 而存在。