rxJava

[TOC]

RxJava是ReactiveX在JVM上的实现,ReactiveX可以利用可观察序列和LINQ风格查询操作符来编写异步和基于时间的程序。使用Rx可以通过Observables表示异步数据流,使用LINQ操作符查询异步数据流,用Schedulers参数化异步数据流的处理。可以理解为Rx结合了观察者模式、迭代器模式和函数式编程的特点。

Observable

可以理解为观察者模式中的被观察者,会异步的发出事件序列,比如网络请求或者IO等,都可以封装为一个Observable。RxJava将很多Rx提供的操作符实现为了函数,可以通过这些函数对Observable发射出的数据进行操作,继续返回一个Observable对象,这些操作函数可以简化对Observable对象的处理。

Observable类型

  • Flowable,支持背压,当观察者处理发射数据处理不完时,可以执行一些策略,比如抛出错误或者丢弃一些数据。
  • Single,只发射一个数据或者错误通知。
  • Observable,可以发射不确定数量的数据。
  • Maybe,可能发射一个或者不发射数据。
  • Completable,用于Observable在完成某件事不发射数据时。

常用操作符

  • create 用于创建一个Observable,给这个操作符传递一个接收观察者作为参数的函数,编写这个函数让它的行为表现为一个Observable–恰当的调用观察者的onNext,onError和onCompleted方法。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    private static Observable createFirstObservable() {
    return Observable.create(new OnSubscribe<Integer>() {

    @Override
    public void call(Subscriber<? super Integer> subscriber) {
    try {
    if (!subscriber.isUnsubscribed()) {
    for (int i = 0; i < 5; i++) {
    subscriber.onNext(i);
    }
    subscriber.onCompleted();
    }
    } catch (Exception e) {
    subscriber.onError(e);
    }
    }
    });
    }
  • 将其他种类的对象和数据类型转换为一个Observable, 可以转换Future、Iterable和数组,产生的Observable会发射Iterable或数组的每一项数据。

    1
    2
    3
    4
    5
    private static Observable createObservableByFrom(){
    Integer[]data = {0,1,2,4,6,8};
    Observable observable = Observable.from(data);
    return observable;
    }
  • just操作符将单个数据转换为发射那个数据的Observable,与from不同,just不会取出数组或者Iterable中的数据逐个发射,而是一整个发射。如下面代码所示,如果我们输出Observable的数据项的size,输出为2,可见是把这个List作为一个数据项。

    1
    2
    3
    4
    5
    6
    7
    8
    private static Observable createObservableByJust() {
    Student s1 = new Student("zhu", 22);
    Student s2 = new Student("long", 34);
    List<Student> data = new ArrayList<>();
    data.add(s1);
    data.add(s2);
    return Observable.just(data);
    }
  • map操作符,接收一个转换方法,对Observable发射的数据进行映射操作,返回一个转换以后的Observable。

    1
    2
    3
    4
    5
    6
    .map(new Func1() {
    @Override
    public Object call(Object o) {
    return ((Integer)o)+1;
    }
    })
  • distinct只允许发射还没有发射过的数据。RxJava将此操作符实现为一个函数,接收一个函数,此函数返回值为区分数据的key。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    private static Observable<Student> createObservableByFromForDistinct() {
    Student s1 = new Student("zhu", 22);
    Student s2 = new Student("long", 34);
    Student s3 = new Student("zhu", 34);
    List<Student> data = new ArrayList<>();
    data.add(s1);
    data.add(s2);
    data.add(s3);
    return Observable.from(data);
    }
    public static void main(String[] args) {
    createObservableByFromForDistinct().distinct(new Func1<Student, Object>() {
    @Override
    public Object call(Student student) {
    return student.name;
    }
    }).subscribe(new Action1() {
    @Override
    public void call(Object o) {
    System.out.println(o.toString());
    }
    });
    }

    Student{name=’zhu’, age=’22’}
    Student{name=’long’, age=’34’}
    onComplete

  • 其他操作符,filter用于对Observable发射的数据进行过来,接收的参数为一个谓词测试语句,只发射通过测试的数据;take操作符可以发送前面N项数据,忽略后面的数据。

observeOn

指定一个观察者在哪个调度器上观察这个Observable,这个观察者的onNext、OnCompleted和onError方法会在指定类型线程运行。

subscribe

操作符是连接观察者和Observable的胶水。一个观察者要想看到Observable发射的数据项,或者想要从Observable获取错误和完成通知,它首先必须使用这个操作符订阅那个Observable。这个方法接收三个方法或者实现了这三个方法的接口的对象。onNext在Observable发射一条数据时调用;OnError, Observable调用这个方法表示它无法生成期待的数据或者遇到了其它错误; onCompleted,如果没有遇到任何错误,Observable在最后一次调用onCompleted之后会调用这个方法。subscribe也可以接受一到三个函数,分别解释为:

  • onNext
  • onNext和onError
  • onNext, onError和onCompleted

subscribeOn

指定Observable自身在哪个调度器执行。

文章作者: hohnor
文章链接: http://www.zhulk3.cn/2022/03/31/rxJava/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 甜茶不贵