目前 RxJava 是安卓开发圈内最流行的库之一,最近项目中也用到了 RxJava, 准备将一些知识点记录下来.方便以后浏览.

1.zip 压合

有时,在 app 中需要访问不同接口,然后将结果整合到一块进行输出(如将第三方广告 API 的广告夹杂进自家平台返回的数据 List 中),这种并行的异步处理比较麻烦,不用使用 zip 之后会简单很多.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//zip有多个参数的重载方法
Observable.zip(
//参数一:第一个接口的数据,并使用 map 将数据转换
Network.getGankApi().getBeauties(200,1).map(GankBeautyResultToItemsMapper.getInstance()),
//参数二:第二个接口的数据
Network.getZhuangbiApi().search("装逼"),
//参数三:将前两个接口的数据进行转换
new Func2<List<Item>, List<ZhuangbiImage>, List<Item>>() {
@Override
public List<Item> call(List<Item> gankItems, List<ZhuangbiImage> zhuangbiImages) {
List<Item> items = new ArrayList<Item>();
for (int i = 0; i < gankItems.size() / 2 && i < zhuangbiImages.size(); i++) {
items.add(gankItems.get(i * 2));
items.add(gankItems.get(i * 2 + 1));
Item zhuangbiItem = new Item();
ZhuangbiImage zhuangbiImage = zhuangbiImages.get(i);
zhuangbiItem.description = zhuangbiImage.description;
zhuangbiItem.imageUrl = zhuangbiImage.image_url;
items.add(zhuangbiItem);
}
return items;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);

2.flatMap 一对多的转换

flatMap 的原理是这样的:

  1. 使用传入的事件对象创建一个 Observable 对象;
  2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;
  3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。

有时,在请求接口时,需要先获取到 token 才能正确返回结果,而 token 需要在另一个接口获取,这样就需要两步才能获取结果(①token -> ②目标数据),使用 flatMap()可以很清晰的实现这种连续请求.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
subscription =
//首先获取到 token
fakeApi.getFakeToken("fake_auth_code")
.flatMap(new Func1<FakeToken, Observable<FakeThing>>() {
@Override public Observable<FakeThing> call(FakeToken fakeToken) {
//获取到 token 之后 通过 token 再去请求真实的数据
return fakeApi.getFakeData(fakeToken);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<FakeThing>() {
@Override public void call(FakeThing fakeData) {
swipeRefreshLayout.setRefreshing(false);
tokenTv.setText("获取到的数据");
}
}, new Action1<Throwable>() {
@Override public void call(Throwable throwable) {
swipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "数据接在失败").show();
}
});

3.repeatWhen()和.retryWhen()

这两个操作符允许你重新订阅已经结束的Observable.
repeatWhen()和.retryWhen()区别在于什么样的终止事件会触发重订阅。
对于这两个操作符不熟悉的可以看下这篇文章:
http://www.jcodecraeer.com/a/anzhuokaifa/androidkaifa/2016/0206/3953.html

  • 使用.repeatWhen() + .delay()定期轮询数据:
1
2
3
4
5
6
source.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override public Observable<?> call(Observable<? extends Void> completed) {
return completed.delay(5, TimeUnit.SECONDS);
}
})

直到notificationHandler发送onNext()才会重订阅到source。因为在发送onNext()之前delay了一段时间,所以优雅的实现了延迟重订阅,从而避免了不间断的数据轮询。

  • 使用.flatMap() + .timer()实现延迟重订阅:
1
2
3
4
5
6
7
8
9
10
11
source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override public Observable<?> call(Observable<? extends Throwable> errors) {
return errors.flatMap(new Func1<Throwable, Observable<?>>() {
@Override public Observable<?> call(Throwable error) {
return Observable.timer(5, TimeUnit.SECONDS);
}
});
}
})
  • 当 token 失效时,重新获取 token 并且继续请求之前的接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
Observable.just(null)
.flatMap(new Func1<Object, Observable<FakeThing>>() {
@Override public Observable<FakeThing> call(Object o) {
return cachedFakeToken.token == null ? Observable.<FakeThing>error(
new NullPointerException("Token is null!")) : fakeApi.getFakeData(cachedFakeToken);
}
})
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override public Observable<?> call(Observable<? extends Throwable> observable) {
return observable.flatMap(new Func1<Throwable, Observable<?>>() {
@Override public Observable<?> call(Throwable throwable) {
if (throwable instanceof IllegalArgumentException
|| throwable instanceof NullPointerException) {
return fakeApi.getFakeToken("fake_auth_code").doOnNext(new Action1<FakeToken>() {
@Override public void call(FakeToken fakeToken) {
tokenUpdated = true;
cachedFakeToken.token = fakeToken.token;
cachedFakeToken.expired = fakeToken.expired;
}
});
}
return Observable.error(throwable);
}
});
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<FakeThing>() {
@Override public void call(FakeThing fakeData) {
swipeRefreshLayout.setRefreshing(false);
String token = cachedFakeToken.token;
if (tokenUpdated) {
token += "(已更新) ")";
}
tokenTv.setText("获取到的数据: ");
}
}, new Action1<Throwable>() {
@Override public void call(Throwable throwable) {
swipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "数据加载失败").show();
}
});

封装

1. 封装 Rx线程相关

对线程封装我们使用了一个很常用的操作符 compose(),这个方法接受一个Transformer 参数, Transformer 继承自Func1,有点类似于 map ,我们可以利用他将一种类型的 Observable转换成另一种Observable.

下面是RxTransformerHelper:

1
2
3
4
5
6
7
public static <T> Observable.Transformer<T, T> io_main() {
return new Observable.Transformer<T, T>() {
@Override public Observable<T> call(Observable<T> tObservable) {
return tObservable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread());
}
};
}

使用:

1
2
3
4
Retrofits.api()
.modifyPswd(oldPswd, newPswd)
//👇这行代码
.compose(RxTransformerHelper.<Result<Success>>io_main())

2. 对返回数据结果进行处理

在开发中,后台返回给我们的数据格式一般都是类似的如下面这样:

1
2
3
4
5
6
7
8
9
10
{
"data": {
"user": {
"id": 1,
"nickname": "三分",
}
},
"error_code": 0,
"error_msg": ""
}

我们在使用时,当 error_code 的值不为0时,就取出 data 里面的数据,所以我使用了一个 map 将数据进行转换

1
.map(new HttpResultFunc<Success>())
1
2
3
4
5
6
7
8
9
public class HttpResultFunc<T> implements Func1<Result<T>, T> {
@Override public T call(Result<T> httpResult) {
if (httpResult.getErr_code() != 0) {
throw new ApiException(httpResult.getErr_msg());
}
return httpResult.getData();
}
}

当getErr_code() != 0时,说明有错误,就抛出一个自定义的异常,让subscribe去处理,没有错误就取出里面的数据.

3.封装Subscriber 对异常进行处理

我们已经处理服务器返回,可能有各种各样的异常,比如:
1、网络异常
2、服务器连接异常
3、接口请求参数等异常
我们可以对Subscriber 进行封装,我们关心的只是结果和错误的信息.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public abstract class RxSubscriber<T> extends Subscriber<T> {
@Override public void onCompleted() {
}
@Override public void onError(Throwable throwable) {
String errorMessage = null;
if (throwable instanceof HttpException) {
HttpException httpException = (HttpException) throwable;
errorMessage = Errors.errorResponse(httpException).error;
}
//自定义异常
else if (throwable instanceof ApiException) {
errorMessage = throwable.getMessage();
}
// 网络问题
else if (throwable instanceof ConnectException
|| throwable instanceof NetworkErrorException
|| throwable instanceof SocketTimeoutException
|| throwable instanceof UnknownHostException) {
errorMessage = "网络未连接或不可用,请检查后重试";
}
// 数据解析失败
else if (throwable instanceof JsonParseException
|| throwable instanceof JSONException
|| throwable instanceof ParseException
|| throwable instanceof ClassCastException
|| throwable instanceof IllegalStateException) {
errorMessage = "数据解析异常";
}
_onError(errorMessage);
}
@Override public void onNext(T t) {
_onNext(t);
}
protected abstract void _onNext(T t);
protected abstract void _onError(String error);
}

使用:

1
2
3
4
5
6
7
8
9
.subscribe(new RxSubscriber<Success>() {
@Override protected void _onNext(Success success) {
view.changePswdSuccess();
}
@Override protected void _onError(String error) {
view.showToast(error);
}
}));