[TOC]
RxJava是ReactiveX在JVM上的实现,ReactiveX可以利用可观察序列和LINQ风格查询操作符来编写异步和基于时间的程序。使用Rx可以通过Observables表示异步数据流,使用LINQ操作符查询异步数据流,用Schedulers参数化异步数据流的处理。可以理解为Rx结合了观察者模式、迭代器模式和函数式编程的特点。
Observable
可以理解为观察者模式中的被观察者,会异步的发出事件序列,比如网络请求或者IO等,都可以封装为一个Observable。RxJava将很多Rx提供的操作符实现为了函数,可以通过这些函数对Observable发射出的数据进行操作,继续返回一个Observable对象,这些操作函数可以简化对Observable对象的处理。
Observable类型
- Flowable,支持背压,当观察者处理发射数据处理不完时,可以执行一些策略,比如抛出错误或者丢弃一些数据。
- Single,只发射一个数据或者错误通知。
- Observable,可以发射不确定数量的数据。
- Maybe,可能发射一个或者不发射数据。
- Completable,用于Observable在完成某件事不发射数据时。
常用操作符
create 用于创建一个Observable,给这个操作符传递一个接收观察者作为参数的函数,编写这个函数让它的行为表现为一个Observable–恰当的调用观察者的onNext,onError和onCompleted方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18private static Observable createFirstObservable() {
return Observable.create(new OnSubscribe<Integer>() {
public void call(Subscriber<? super Integer> subscriber) {
try {
if (!subscriber.isUnsubscribed()) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i);
}
subscriber.onCompleted();
}
} catch (Exception e) {
subscriber.onError(e);
}
}
});
}将其他种类的对象和数据类型转换为一个Observable, 可以转换Future、Iterable和数组,产生的Observable会发射Iterable或数组的每一项数据。
1
2
3
4
5private static Observable createObservableByFrom(){
Integer[]data = {0,1,2,4,6,8};
Observable observable = Observable.from(data);
return observable;
}just操作符将单个数据转换为发射那个数据的Observable,与from不同,just不会取出数组或者Iterable中的数据逐个发射,而是一整个发射。如下面代码所示,如果我们输出Observable的数据项的size,输出为2,可见是把这个List作为一个数据项。
1
2
3
4
5
6
7
8private static Observable createObservableByJust() {
Student s1 = new Student("zhu", 22);
Student s2 = new Student("long", 34);
List<Student> data = new ArrayList<>();
data.add(s1);
data.add(s2);
return Observable.just(data);
}map操作符,接收一个转换方法,对Observable发射的数据进行映射操作,返回一个转换以后的Observable。
1
2
3
4
5
6.map(new Func1() {
@Override
public Object call(Object o) {
return ((Integer)o)+1;
}
})distinct只允许发射还没有发射过的数据。RxJava将此操作符实现为一个函数,接收一个函数,此函数返回值为区分数据的key。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23private static Observable<Student> createObservableByFromForDistinct() {
Student s1 = new Student("zhu", 22);
Student s2 = new Student("long", 34);
Student s3 = new Student("zhu", 34);
List<Student> data = new ArrayList<>();
data.add(s1);
data.add(s2);
data.add(s3);
return Observable.from(data);
}
public static void main(String[] args) {
createObservableByFromForDistinct().distinct(new Func1<Student, Object>() {
public Object call(Student student) {
return student.name;
}
}).subscribe(new Action1() {
public void call(Object o) {
System.out.println(o.toString());
}
});
}Student{name=’zhu’, age=’22’}
Student{name=’long’, age=’34’}
onComplete其他操作符,filter用于对Observable发射的数据进行过来,接收的参数为一个谓词测试语句,只发射通过测试的数据;take操作符可以发送前面N项数据,忽略后面的数据。
observeOn
指定一个观察者在哪个调度器上观察这个Observable,这个观察者的onNext、OnCompleted和onError方法会在指定类型线程运行。
subscribe
操作符是连接观察者和Observable的胶水。一个观察者要想看到Observable发射的数据项,或者想要从Observable获取错误和完成通知,它首先必须使用这个操作符订阅那个Observable。这个方法接收三个方法或者实现了这三个方法的接口的对象。onNext在Observable发射一条数据时调用;OnError, Observable调用这个方法表示它无法生成期待的数据或者遇到了其它错误; onCompleted,如果没有遇到任何错误,Observable在最后一次调用onCompleted
之后会调用这个方法。subscribe也可以接受一到三个函数,分别解释为:
- onNext
- onNext和onError
- onNext, onError和onCompleted
subscribeOn
指定Observable自身在哪个调度器执行。