RxJava

2018/06/19 Java

简介

RxJava 是一个 java 异步库,采用了观察者模式。

概念

Observable: 被观察者,也可称为发射源 Observer: 观察者,也可称为接收源 Subscriber: 订阅者,是一个特殊的观察者,可以使用 unsubscribe() 取消订阅

基本用法

Observable 创建

//使用 create 方法
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
    public void subscribe(ObservableEmitter<Integer> emitter) {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
        emitter.onComplete();//调用 onComplete 后还会执行后续代码,但是观察者接收不到
        emitter.onNext(4);
    }
});
//使用 just,自动调用 onNext 发送数据
Observable<Integer> just = Observable.just(1, 3, 4);
//使用 from,按照顺序发送 list 中元素
List<Integer> list = new ArrayList<>();
list.add(1);
list.add(2);
Observable<Integer> from = Observable.from(list);  //遍历list 每次发送一个
//使用defer( ),有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable:
Observable defer = Observable.defer(() -> Observable.just("deferObservable"));
//创建一个按固定时间间隔发射整数序列的Observable
Observable<Long> interval = Observable.interval(1, TimeUnit.SECONDS);
//创建一个发射特定整数序列的Observable,第一个参数为起始值,第二个为发送的个数,如果为0则不发送,负数则抛异常
Observable<Integer> range = Observable.range(10, 5);
//在一个给定的延迟后发射一个特殊的值
Observable<Long> timer = Observable.timer(3, TimeUnit.SECONDS);
//重复发送
Observable<String> repeat = Observable.just("repeatObservable").repeat(3);//重复发射3次

Observer 创建

Observer<Integer> observer = new Observer<Integer>() {
    public void onSubscribe(Disposable disposable) {
        log.debug("subscribe");
    }

    public void onNext(Integer value) {
        log.debug("" + value);
    }

    public void onError(Throwable throwable) {
        log.error("error");
    }

    public void onComplete() {
        log.debug("complete");
    }
};

Search

    Table of Contents