1.Observables

先认识几个重要的概念

  • Observable:一个可观察对象或者被观察者
  • Observer:观察者(订阅者)
  • Subscribe:Observer的子类,比Observer多了onStart和unsubscribe两个方法

在RxJava中,一个实现了Observer接口的对象可以订阅(subscribe)一个Observable 类的实例。订阅者(subscriber)对Observable发射(emit)的任何数据或数据序列作出响应。

2.操作符Create

2.1 概要步骤:

  1. 创建被观察者 Observable observable = new Observable();
  2. 创建观察者 Observer observer = new Observer();
  3. 观察者订阅被观察者 observable.subscribe(observer);

2.2 实际使用:


1.创建被观察者
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>(){
@Override public void call(Subscriber<? super String> subscriber) {
// TODO(HeLong.W): todo somthing
}
});

2.创建观察者
Observer<String> observer = new Observer<String>() {
    @Override public void onCompleted() {
    }
    @Override public void onError(Throwable e) {
    }
    @Override public void onNext(String o) {
    }
};


3.观察者订阅被观察者
observable.subscribe(observer);

综合1、2、3来写:
Observable.create(new Observable.OnSubscribe<String>() {
    @Override public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onCompleted();
    }
}).subscribe(new Subscriber<String>() {
    @Override public void onCompleted() {
        System.out.println("onCompleted exe");
    }
    @Override public void onError(Throwable e) {
    }
    @Override public void onNext(String s) {
        System.out.println(s);
    }
});

2.3 源码分析

先看一下Observable.create()了什么

1
2
3
4
/*Observable.class*/
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}

create方法很简单,就是调用了默认的构造器,并且传入一个构造器需要的参数,参数值由hook.onCreate(f)返回,先看看这个hook.onCreate(f)是什么鬼:

1
2
3
4
5
6
7
/*Observable.class 中定义的hookd变量*/
RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
/*RxJavaObservableExecutionHook.class*/
public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
return f;
}

从上面的代码可以看到hook.onCreate(f)什么也没做,就是直接返回f,接下来看看默认的构造器干什么了:

1
2
3
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}

也是很简单的给自己的成员变量onSubscribe赋值而已。那么接下来看看这个OnSubscribe是什么呢?从上面的代码看OnSubscribe是Observable的一个内部类:

1
2
3
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
// cover for generics insanity
}

额,原来是一个接口,看他的超类定义:

1
2
3
4
5
6
7
/**
* A one-argument action.
* @param <T> the first argument type
*/
public interface Action1<T> extends Action {
void call(T t);
}

也很简洁,就是一个带有一个泛型参数的接口,那么上面的子类OnSubscribe就是指定了这个泛型的类型为Subscriber类型的,而指定的这个Subscriber就是后面要说的观察者。

看完了被观察者,再来看看观察者,一个接口类,提供了三个接口:

1
2
3
4
5
6
7
8
public interface Observer<T> {
void onCompleted();
void onError(Throwable e);
void onNext(T t);
}

我们上面设置观察者的时候使用的是Subscriber,那么SubscriberObserver的关系是什么?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public abstract class Subscriber<T> implements Observer<T>, Subscription{
@Override
public final void unsubscribe() {
subscriptions.unsubscribe();
}
@Override
public final boolean isUnsubscribed() {
return subscriptions.isUnsubscribed();
}
public void onStart() {
// do nothing by default
}
}

看上去有点复杂,多了几个方法,onStart , unsubscribe , isUnsubscribed.不多做说明,后面会说到,只需要知道SubscriberObserver是继承关系即可,都可以作为观察者。
接下来看最关键的一部分了,观察者与被观察者关联的过程:

1
2
3
4
5
6
7
/*Observable.class*/
public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
return subscribe(new ObserverSubscriber<T>(observer));
}

虽然有个if判断,但是两个分支最后走的是同一个方法,所以直接看这个Subscription subscribe(Subscriber<? super T> subscriber)实现:

1
2
3
4
/*Observable.class*/
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}

额,还没走到真正处理的地方,继续跟,一大波代码来临,终于看见了曙光:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
if (subscriber == null) {
throw new IllegalArgumentException("subscriber can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
}
subscriber.onStart();
if (!(subscriber instanceof SafeSubscriber)) {
subscriber = new SafeSubscriber<T>(subscriber);
}
try {
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
//异常处理....
return Subscriptions.unsubscribed();
}
}

又碰到了hook,看看hook.onSubscribeStart(observable, observable.onSubscribe)这个干嘛的:

1
2
3
4
public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
// pass through by default
return onSubscribe;
}

依然什么也没做,直接返回了第二个参数onSubscribe,那hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);可以替换成observable.onSubscribe.call(subscriber);,接下来关键就看这个call()做了什么?先看看调用这个call的是谁?是observable.onSubscribe,那这个observable.onSubscribe又是谁?他就是我们第一步create时设置的onSubscribe!所以这里的call做了什么,取决于我们!一般我们就在call里面先处理数据,然后调用onNext方法即可。
最后看一下还有一个return hook.onSubscribeReturn(subscriber),看一下内部实现:

1
2
3
4
public <T> Subscription onSubscribeReturn(Subscription subscription) {
// pass through by default
return subscription;
}

依然什么也没做,把参数直接返回了。那么返回的这个Subscription是干嘛用的,我们来看看:

1
2
3
4
public interface Subscription {
void unsubscribe();
boolean isUnsubscribed();
}

原来是用来取消订阅观察的,如果调用了unsubscribe()后,这个观察者就不再起作用了。