观察者、被观察者、订阅
1.创建Observable
Observable的创建操作符:
比如: create(),from(),just(),repeat(),defer(),rangeinterval(),和timer()等等
2.创建Observer
Observer用于处理Observable发送过来的各类事件。
可以用Operators(操作符)对事件进行各种拦截和操作。
RxJava还创建了一个继承了Observer的抽象类:Subscriber:
Subscriber进行了一些扩展,基本使用方式是一样的,这也是以后我们主要用到的一个类
3.Subscribe订阅
通过subscribe()方法订阅,把observable和observer关联起来订阅后,observable就会调用observer的onNext()、onCompleted()、onError()等方法。
下面是使用create方法创建被观察者,并创建观察者、订阅的一个例子:
Observable<String> observable=Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello world");
subscriber.onNext("hello RxJava");
subscriber.onCompleted();
}
});
Observer<String> observer=new Observer<String>() {
@Override
public void onCompleted() {
Log.i(TAG, "onCompleted: 观察者执行");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: "+s);
}
};
observable.subscribe(observer);
使用from方法可以快速创建一个事件队列:
String[] array=new String[]{"hello world","hello RxJava","hello shopkeeper"};
Observable<String> observable=Observable.from(array);
Observer<String> observer=new Observer<String>() {
@Override
public void onCompleted() {
Log.i(TAG, "onCompleted: 观察者执行");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: "+s);
}
};
observable.subscribe(observer);
使用just方法可以直接传入事件:
Observable<String> observable=Observable.just("hello world","hello RxJava","hello shopkeeper");
Observer<String> observer=new Observer<String>() {
@Override
public void onCompleted() {
Log.i(TAG, "onCompleted: 观察者执行");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: "+s);
}
};
observable.subscribe(observer);
创建不完全的回调:
Observable<String> observable=Observable.just("hello world","hello RxJava","hello shopkeeper");
Action1<String> onNextAction1=new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "call: "+s);
}
};
Action1<Throwable> onErrorAction=new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.d(TAG, "call: onError "+ throwable.getMessage());
}
};
Action0 onCompletedAction= new Action0() {
@Override
public void call() {
Log.i(TAG, "call: completed");
}
};
observable.subscribe(onNextAction1,onErrorAction,onCompletedAction);