Rxjava2入门教程二:Observable与Observer响应式编程在Rxjava2中的典型实现

作者:编程    发布时间:2020-02-11 05:31     浏览次数 :

[返回]

本文中通过图解的方式解释Rxjava中复杂的操作符,值得收藏。其中用到的demo地址:RxJava2-Android-Samples

为避免手机阅读时,代码格式错乱,本教程中大多数代码均以图片形式展示
如需下载源码,请访问
https://github.com/fengchuanfang/Rxjava2Tutorial
文章原创,转载请注明出处:
Rxjava2入门教程二:Observable与Observer响应式编程在Rxjava2中的典型实现

Rxjava是什么

响应式编程
观察者设计模式
一个实现异步操作的库
代码托管地址

图片 1demo:Observable<List<String>> buffered = getObservable().buffer;第一个参数表示在emit数据之前,Observable需要缓存多少个数据第二个参数表示每次emit数据之后跳过几个数据。


关于响应式编程

百科:
响应式编程是一种面向数据流和变化传播的编程范式,这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。
其他资料:响应式编程就是与异步数据流交互的编程范式。
个人理解(Rxjava):
相对于命令式编程/函数式编程而言,这里指在使用Rxjava过程中的对响应式编程的理解;
传统的两种是自己主动控制去得到数据,主动控制数据得流向(展示/参数),然后将数据和数据的流向代码组装起来。
Rxjava中的响应式编程是被观察者拿到数据主动传递给观察者,将展示层和数据处理层分离,解耦了各个模块,通过不同线程操控代码运作配合变换过滤等api操作实现数据流传播。

图示中就是每次buffer 2个数据之后emit,每次emit之后跳过3个数据。

在RxJava中,函数响应式编程具体表现为一个观察者(Observer)订阅一个可观察对象(Observable),通过创建可观察对象发射数据流,经过一系列操作符(Operators)加工处理和线程调度器(Scheduler)在不同线程间的转发,最后由观察者接受并做出响应的一个过程
ObservableSource与Observer是RxJava2中最典型的一组观察者与可观察对象的组合,其他四组可以看做是这一组的改进版或者简化版。

关于观察者模式

观察者订阅被观察者,被观察者主动把所处理的结果或约定的信息返回给观察者做处理(代码上其实是被观察者订阅观察者)
在android中被观察者负责数据采集、转化、处理,观察者接收处理的结果做出相应的操作

  • 举例:点击事件、Eventbus

Button的点击监听 OnClickListener 。对设置 OnClickListener 来说, Button 是被观察者, OnClickListener 是观察者,二者通过 setOnClickListener()
方法达成订阅关系。订阅之后用户点击按钮的瞬间,Android Framework 就会将点击事件发送给已经注册的 OnClickListener ,执行Oclick()方法。
EventBus是一个更加强大的可以进行线程间操作观察者模式,一旦注册会将当前类作为观察者,内部通过反射机制获取接收数据结果的而各种方法,被观察者通过post发送数据

  • 观察者模式的优点

Observer模式(Rxjava)的优点是实现了展示层和数据逻辑层的分离,并定义了更新消息传递机制,提供各种类别清晰的接口,
使得可以有各种各样不同的表示层(观察者);一对多,一个被观察者可以被多个观察者监听。

图片 2demo:Observable.concat(aObservable, bObservable)第一个参数为第一个Observable第二个参数为第二个Observable图示中连接两个Observable之后,数据会连接起来,emit a1, a2, a3,b1,b2,b3图片 3

Observable

抽象类Observable是接口ObservableSource下的一个抽象实现,我们可以通过Observable创建一个可观察对象发射数据流。

图片 4

demo1_1.jpg

上例中,调用Observable.create创建一个可观察对象,并发送“Hello World”,然后通知发送完成

Rxjava的优点

异步、简洁(逻辑、代码读写)。内部支持多线程操作,强大的map和flatmap保证了依赖上一次接口数据进行二次处理时不会发生嵌套,将各个模块分离。
java1.8和第三方框架支持Lambda流式。保证了Rxjava的代码在阅读上更加简洁。
随着程序逻辑的复杂,依然保持简洁。解耦了各个模块操作,单一化,不嵌套。

lambda配置(配置jdk1.8或以上):
build.gradle(Module:app)中

android{
  ....
  compileOptions {  
  sourceCompatibility JavaVersion.VERSION_1_8    
  targetCompatibility JavaVersion.VERSION_1_8}
}

defaultConfig {   
  ....
  jackOptions {   
     enabled true   
 }
}

设置成功可以lambda的部分会变灰,鼠标移到上面显示下图,只需选中灰色部分alt+enter就可以显示lambda表达式

图片 5

Paste_Image.png

demo:getObservable() .debounce(500, TimeUnit.MILLISECONDS)第一个参数是时间间隔第二个参数是时间单位debounce表示emit数据之后一定时间内没有其他数据出现才真正emit数据。图示中emit黄球后,在规定时间内又emit绿球,则黄球不会被emit。

Observer

创建一个观察者Observer来接受并响应可观察对象发射的数据流

图片 6

demo1_2.jpg

在onNext方法中接收到可观察对象发射的数据"Hello World",并做出响应——打印到控制台。

Rxjava的实现方式

图片 7

Observer订阅Observable

图片 8

demo1_3.jpg

一旦Observer与Observable建立了订阅关系,Observer与Observable便成为了一个整体,Observer便可对Observable中的行为作出响应。

1. 依赖

compile 'io.reactivex:rxjava:1.0.14'
compile 'io.reactivex:rxandroid:1.0.1'

defer为每一个observer创建一个ObservableSource,这样当第一个observer订阅之后如果ObservableSource中的数据发生变化,第二个订阅的Observer会得到不同的数据。

Emitter/Observer

通过Observable.create创建可观察对象时,我们可以发现具体执行发射动作的是接口ObservableEmitter的实例化对象,而ObservableEmitter<T> 继承自 接口Emitter<T>,查看源码接口Emitter的具体代码如下:

public interface Emitter<T> {
        //用来发送数据,可多次调用,每调用一次发送一条数据
     void onNext(@NonNull T value);
        //用来发送异常通知,只发送一次,若多次调用只发送第一条
        void onError(@NonNull Throwable error);
        //用来发送完成通知,只发送一次,若多次调用只发送第一条
        void onComplete();
}

onNext:用来发送数据,可多次调用,每调用一次发送一条数据
onError:用来发送异常通知,只发送一次,若多次调用只发送第一条
onComplete:用来发送完成通知,只发送一次,若多次调用只发送第一条

onError与onComplete互斥,两个方法只能调用一个不能同时调用,数据在发送时,出现异常可以调用onError发送异常通知也可以不调用,因为其所在的方法subscribe会抛出异常,若数据在全部发送完之后均正常可以调用onComplete发送完成通知;其中,onError与onComplete不做强制性调用。
接口Observer中的三个方法(onNext,onError,onComplete)正好与Emitter中的三个方法相对应,分别对Emitter中对应方法的行为作出响应。
Emitter调用onNext发送数据时,Observer会通过onNext接收数据。
Emitter调用onError发送异常通知时,Observer会通过onError接收异常通知。
Emitter调用onComplete发送完成通知时,Observer会通过onComplete接收完成通知。

2. Observer(观察者,订阅后最终执行的动作)

    Observer<String> observer = new Observer<String>() {
        @Override
        public void onNext(String s) {
            //需要执行的
            Log.d(tag, "Item: " + s);
        }
        @Override
        public void onCompleted() {
            //回调完成
            Log.d(tag, "Completed!");
        }
        @Override
        public void onError(Throwable e) {
            //回调失败
            Log.d(tag, "Error!");
        }
    };
  • Observer扩展(Subscriber)

observer是Rxjava观察者的最基本实现,Rxjava提供了另一个实现Observer的抽象类,完善了Observer的方法(onStart()、、unsubscribe())
onStart(): 在订阅之前执行(后面会讲到观察者只有在被观察者订阅了才会执行),不能被指定线程,所以不能用来操作UI,只能简单的做数据清空
unsubscribe():这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅,防止内存泄露,两种操作:1在onDestory()判断然后反订阅,2主动在observale调用onCompleted

demo:

步骤简化

去掉中间变量可以对之前的代码简化为以下形式:

图片 9

demo2.jpg

在响应式编程的基础上,加上函数式编程,真正的函数响应式编程可以将代码简化成以下形式:

图片 10

demo3.jpg

其中,just操作符是经过封装后,专门用来发射单条数据的,可以是一个数据,一条字符,一个对象,一整个数组,一整个集合。
Consumer可以看做是对观察者Observer功能单一化之后的产物——消费者,上例中的Consumer通过其函数accept只接收可观察对象发射的数据,不接收异常信息或完成信息。
如果想接收异常信息或完成信息可以用下面的代码:

图片 11

demo4.jpg

第二个参数Consumer规定泛型<Throwable>通过函数accept接收异常信息。
第三个参数Action也是对观察者Observer功能单一化之后的产物--行动,通过函数run接收完成信息,作出响应行动。

3. Observable(被观察者,订阅后决定操作什么事件)

    Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
        //一旦被订阅,在Observer中回调3次onNext()和一次onCompleted()
            subscriber.onNext("a");
            subscriber.onNext("b");
            subscriber.onNext("c");
            subscriber.onCompleted();
    }
});
  • Observable创建方式扩展(just(T...)/from(T[]) / from(Iterable<? extends T>))

just(T...) 将传入的参数依次发送出来
from(T[]) / from(Iterable<? extends T> 将传入的数组或 Iterable 拆分成具体对象后,依次发送出来
just/from是把其他类型的对象和数据类型转化成Observable

  • 举例
    Observable observable = Observable.just("a", "b", "c");
    // 将会依次调用:
    // onNext("a");
    // onNext("b");
    // onNext("c");
    // onCompleted();
Observable.defer(new Callable<ObservableSource<? extends String>>() { @Override public ObservableSource<? extends String> call() throws Exception { return Observable.just; } });

发送数据序列

Observable可以发送单条数据或者数据序列,上面的事例都是发送单条数据'Hello World"的情形,那么怎样发送数据序列呢?
可以通过最基础的方法:

图片 12

demo5.jpg

通过在方法subscribe中循环遍历String类型的集合list中的元素,然后通过emitter.onNext(str)将他们逐一发送;如果发送过程中捕获到异常,通过emitter.onError(e)发送异常信息;最后如果数据正常发送完毕调用 emitter.onComplete()发送完成通知,Observer中通过onNext接收emitter发送的每一条信息并打印到控制台(emitter发送几次,Observer便接收几次),通过onError(Throwable e)接收异常信息,onComplete()接收完成信息。
同样可以通过操作符对其进行简化,如下;

图片 13

demo6.jpg

其中fromIterable(list)也是一个封装好的操作符,可以将一个可迭代对象中的每一个元素逐一发送

4. Subscribe(订阅)

observable.subscribe(observer);   ||  observable.subscribe(subscriber);
subscribe核心源码:
public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);
    return subscriber;
}
首先是调用传入的 subscriber.onStart 方法,该方法默认不做任何操作。之后就是调用创建Observable时保存的OnSubscriber.call方法,而在 call() 中我们调用了subscriber的 onNext() 和 onCompelte() 。这就完成了从了Observable到subscriber的数据的传递。最后返回subscriber是为了方便取消订阅等操作。

demo中可以随时改变brand的值,这样不同的Observer可能会得到不同的值。

Disposable

在之前的例子中,可以看到Observer接口中还有一个方法没有说

public void onSubscribe(Disposable d) {
}

这个方法中有个Disposable类型的参数,
onSubscribe表示在订阅时,当观察者Observer订阅可观察对象Observable,建立订阅关系后,会触发这个方法,并且会生成一个Disposable对象,其实无论观察者Observer以何种方式订阅可观察对象Observable,都会生成一个Disposable,不管有没有onSubscribe(Disposable d)方法,如下:

图片 14

demo7.jpg

查看Disposable接口的源码,如下:

public interface Disposable {
        void dispose();
        boolean isDisposed();
}

Disposable是观察者Observer与可观察对象Observable建立订阅关系后生成的用来取消订阅关系和判断订阅关系是否存在的一个接口。
只有当观察者Observer与可观察对象Observable之间存在订阅关系时,Observer才能接收Observable发送的数据或信息。如果Observer在接收Observable的信息的过程中,取消了订阅关系,则Observer只能接收订阅关系取消之前Observable发送的数据,对于订阅关系取消之后Observable发送的数据,Observer将不会再接收。
运行下面的代码,当Observable接收到第5条数据时,取消订阅关系。

图片 15

demo8.jpg

控制台日志如下:

 I/System.out: 发送0
 I/System.out: 接收0
 I/System.out: 发送1
 I/System.out: 接收1
 I/System.out: 发送2
 I/System.out: 接收2
 I/System.out: 发送3
 I/System.out: 接收3
 I/System.out: 发送4
 I/System.out: 接收4
 I/System.out: 发送5
 I/System.out: 接收5
 I/System.out: 发送6
 I/System.out: 发送7
 I/System.out: 发送8
 I/System.out: 发送9

可以发现取消订阅关系之前,Observable发送一条数据,Observe接收一条,取消订阅关系之后,Observe将不再接收Observable发送的数据。

上一篇:Rxjava2入门教程一:函数响应式编程及概述
下一篇;Rxjava2入门教程三:Operators操作符

5. 常用方法讲解

  • ActionX()
    Action1:
    public interface Action1<T> extends Action {
        void call(T t);
    }
    单参数无返回值的call方法。onNext(T obj)和onError(Throwable error)也是单参数无返回值的
    Action0:
    public interface Action0 extends Action {
        void call();
    }
    无参数无返回值的call方法 ,由于 onCompleted() 方法也是无参无返回值的
    总结:在一些情况下不需要去全部实现,其实就是observer/Subscriber的部分抽离,作为一个参数传入subscribe()实现不完整回调(完整的有三个回调方法)
  • FuncX()
    FuncX
    FuncX和ActionX类似,只是FuncX有返回值,用于observable的数据处理
    public interface Func1<T1, R> extends Function {
        public R call(T1 t1);
    }
    public interface Func2<T1, T2, R> extends Function {
        public R call(T1 t1, T2 t2);
    }
    至于它的Func0、Func1的实现只是多传了几个参数,配合map(操作符),实现observable数据的进一步处理

图片 16

6. 常用操作符讲解

  • map()

Returns an Observable that applies a specified function to each item emitted by the source Observable and emits the results of these function applications
对Observable发射的数据都应用一个函数,然后再发射最后的结果集。最后map()方法返回一个新的Observable
map配合FuncX()实现数据的进一步处理

    如:
    Observable.just("aa").map(new Func1<String, String>() {
                @Override
                public String call(String s) {
                    return s+"bb";
                }
            });
  • flatmap()

Returns an Observable that emits items based on applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then merging those resulting Observables and emitting the results of this merger.
对Observable发射的数据都应用(apply)一个函数,这个函数返回一个Observable,然后合并这些Observables,并且发送(emit)合并的结果。flatMap和map操作符很相像,flatMap发送的是合并后Observables,map操作符发送的是应用函数后返回的结果集、

变换整个事件队列;
flatMap() 和 map() 有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和 map() 不同的是, flatMap() 中返回的是个 Observable 对象, 并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中。

flatMap() 的原理是这样的:

  1. 对于每传入的一个事件对象创建一个 Observable 对象,当然flatMap是操纵整个事件序列,目的是把事件序列中的对象一一取出一一进行转换,一般配合Func1<param1,param2>取单个事件,如果非要传入一个List,就把整个List对象的事件队列返回一个包括 所有事件的一个Observable,没什么意义等于还是一对一转换不符合flatmap的原理;
  2. 对于每一个创建出来的 Observable 发送的事件,都被汇入一个Observable(新的,乱序) ;解释一下:一旦数据序列执行flatmap方法后当被订阅时不是取一个事件执行一次onNext,而是所有的事件都被转化后再一一执行OnNext;如果flatmap后还有其他操作(不是flatmap(...).subscribe(..)直接调用的这种)则流程和上面一样,只不过要在每次取得事件时将其他操作执行完毕再取下一个。一旦被订阅就在在单一线程中执行(默认只有一个线程操作);如果在操纵数据时指定.subscribeOn()则多线程执行且输出乱序;
  • 举例
/**
 * Observable.from()/just方法,它接收一个集合/数组作为输入,然后每次输出一个元素给subscriber:返回的是Observable
 * func<param1,param2></> 一个函数,当应用于由源Observable发出的项时,返回一个Observable
 * @param1,前面传过来Observable里面的结果集,这里可能说法不太标准,为了便于理解,注意是结果集 
 * @param2,返回的Observable
 */
  List<Integer> integers = Arrays.asList(2, 3, 4, 5, 6, 7, 8, 9, 10);
        Observable.from(integers)
                .flatMap(new Func1<Integer, Observable<Integer>>() {
                    @Override
                    public Observable<Integer> call(Integer integer) {
                        Log.i("tag", "RxMethod1-----11---::" + integer);
                        return Observable.just(integer * integer);
                    }
                })
                .flatMap(new Func1<Integer, Observable<String>>() {
                    @Override
                    public Observable<String> call(final Integer integer) {
                        return Observable.create(new Observable.OnSubscribe<String>() {
                            @Override
                            public void call(Subscriber<? super String> subscriber) {
                                String s = integer + "  /  ";
                                subscriber.onNext(s);
                                Log.i("tag", "RxMethod1-----22---::" + s);
                                subscriber.onCompleted();
                            }
                        })
                        // .subscribeOn(Schedulers.io())如果把执行线程放在这里,则多线程执行,顺序也就不能保证了
                        ;
                    }
                })
                .subscribeOn(Schedulers.io())//指定被观察者执行的线程
                .observeOn(AndroidSchedulers.mainThread())//指定观察者执行的线程
                .subscribe(new Observer<String>() {
                    @Override
                    public void onCompleted() {
                        tv.setText(str1);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.i("tag", "onError--------::" + e.toString());
                    }

                    @Override
                    public void onNext(String s) {
                        str1 = sb1.append(s).toString();
                    }
                });
        解析:
        new Func1<传入的参数, 返回的Observable的数据集>()
  • concatmap()
    和flatmap类似,但是有有顺序的从返回的observable输出到另一个observable中最后统一交给Subscribe的回调方法,永远单一线程执行,保证顺序
  • doOnSubscribe()
Observable.just(1, 2, 3, 4)
      .doOnSubscribe(....)//弹窗等等
      .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
      .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
      .subscribe(new Action1<Integer>() {
          @Override
          public void call(Integer number) {
              Log.d(tag, "number:" + number);
          }
      });
    默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。
  • doOnNext()
允许程序做一些额外的操作,可能与订阅执行结果无关,仅做一些信息或过程中某一步结果的的回传,保留,供其他地方使用
  • filter()
做信息过滤,与map一样返回结果集
  • take()
 输出最多指定数量的结果。
  • interval()
创建一个定时发射整数序列的Observable
  • compose(bindToLifecycle())
 管理生命周期, 防止内存泄露(在订阅前调用)
  • reduce()
reduce操作符实际上是把传入的list里的所有item进行两两比较或添加
  • Range()
创建发射指定范围的整数序列的Observable
  • Repeat()
创建重复发射特定的数据或数据序列的Observable
  • Timer()
创建在一个指定的延迟之后发射单个数据的Observable

distinct可以对 emit 的数据做去重处理demo:

7. 总结

本篇总结了Rxjava的基本使用流程,常用的API,操作符的理解,有不
足之处还请各位看官指教,下一篇会将Retrofit配合Rxjava实现网络请
求,以及获取数据后的各种处理。
代码托管地址

Observable.just(1, 2, 1, 1, 2, 3, 4 ,6, 4) .distinct() .subscribe(getObserver;

demo中最后emit的数据只有1,2,3,4,6

图片 17

filter按照一定的规则过滤数据demo:

 Observable.just(1, 2, 3, 4, 5, 6) .filter(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer % 2 == 0; } }) .subscribe(getObserver;

demo中原始数据中奇数会被过滤掉。

图片 18reduce 对所有数据进行处理,最终emit一个数据。demo: