在这章我们将讨论 Kotlin 中的 RxJava.这是一个不可思议的组合,我们将利用它们使我们的 App 到达另一个层次,解耦我们的 UI 线程与后台线程,在这种情况下从服务端请求 Reddit 新闻.

全部章节:

Kotlin — Part 0:关于这个系列
Kotlin — Part 1:配置 Android Studio
Kotlin — Part 2:语法,空安全,静态类型
Kotlin — Part 3:扩展函数、Android 扩展、委托属性
Kotlin — Part 4:RecyclerView— Kotlin 适配器委托&数据类
Kotlin — Part 5:Kotlin,RxJava&RxAndroid
Kotlin — Part 6:API-Retrofit&Kotlin)
Kotlin — Part 7:无限滑动:高阶函数& Lambdas
Kotlin — Part 8:方向改变(序列化&数据类)
Kotlin — Part 9:单元测试与 Kotlin(Mockito,RxJava)

Github 仓库:https://github.com/imuhao/KedditBySteps

如果现在你第一次听说 RxJava,我推荐你先阅读这个文章.

RxJava 是一个 响应式扩展实现,一个由异步和基于事件驱动的第三方框架.

我们将使用 Observable 类避免从主线程中请求服务端数据(或任何长时间操作)

News Manager

NewsManager 类从 Reddit API 提供新闻数据.它负责制定服务请求,给我们一个新闻 item 集合.

这个主要的思想是使我们的 API 调用在主线程以外的其它线程执行.一个大的图片将解释我们所要做的:

Observable

这个 NewsManager 类将提供一个方法返回一个 Observable 对象,允许你在另一个地方运行一块代码(在这种情况下载一个新的线程).

1
2
3
fun getNews(): Observable<List<RedditNewsItem>> {
...
}

我们将在 NewsManager 中创建 Observable 对象.我们将提供这个对象的实现.

1
2
3
4
5
6
7
8
9
class NewsManager {
fun getNews(): Observable<List<RedditNewsItem>> {
return Observable.create { subscriber ->
var news = mutableListOf<RedditNewsItem>()
subscriber.onNext(news)
subscriber.onCompleted()
}
}
}

感谢 Kotlin,我们可以使用 lambda 表达式为 Observable.create() 方法提供需求.这个 create() 方法需要你提供一个 subscriber 对象,这个 subscriber 对象允许你通过3个方法发送事件到订阅者.

  • onNext(item:T): 我们调用这个方法为订阅者提供从服务端接受到的新闻数据,在这种情况下根据 getNews() 的返回类型推断出 T 将是” List“类型
  • onCompleted():调用这个方法代表Observable 已经完成提供所有的数据,我们将在调用 onNext() 方法之后调用这个方法,准备好结束这个进程.
  • onError(e:Throwable): 如果我们调用 API 出现任何错误将使用这个方法告诉订阅者,我们将之后通过 SnackBar 向用户展示错误信息.

Lazy NewsManager

我们为 NewsManager 使用 lazy 属性

1
2
3
private val newsManager by lazy {
NewsManager()
}

这个 newsManager 将只在第一次使用时初始化 NewsManager()

从 NewsFragment 请求新闻

现在是时候使用我们的 NewsManager, 请求一些新闻显示在 RecyclerView.

Subscribe>Subscription>Observer

为了从 NewsManager请求 Reddit 新闻, 我们在 NewsFragment 将NewsManager.getNews() 得到的 Observable 转变为 Observer. 我们要做的是调用 getNews() 方法,调用这个方法的” subscribe()”接受这个 Observable:

1
2
3
4
5
6
7
val subscription = newsManager.getNews().subscribe (
{ retrievedNews ->
...
},
{ e ->
...
}

这个 subscribe 有好几个重载方法,我们将使用这个:

1
2
3
4
5
public final Subscription subscribe(
final Action1<? super T> onNext,
final Action1<Throwable> onError) {
...
}

它接受两个函数

  • onNext:当 Observable 的 onNext 调用时这个函数将被调用,我们将使用它设置 NewsAdapter.
  • onError: 当 Observable 的 onError 调用时候这个函数被调用,我们使用它显示 SncakBar 与错误消息.

它返回一个 Subscription 对象,这个对象允许我们管理这个subscription, 像检测是否订阅或取消订阅.

再一次感谢 Kotlin, 我们可以对这两个函数使用 lambda 表达式.从这个例子的第一个函数,这个 retrievedNews 变量是我们从 onNext() 方法接受新闻数据的名字.

1
2
3
4
{
retrievedNews ->
(news_list.adapter as NewsAdapter).addNews(retrievedNews)
}

记得onNext 返回类型是” List“,所以我要做的是直接将这些数据设置给 NewsAdapter.

在 onError 函数,我只是告诉用户错误信息,我可以使用” e”,一个 Throwable 类型得到详细错误信息

1
2
3
{ e ->
Snackbar.make(...).show()
}

现在我们仍然在主线程

如果我们现在运行 APP, 它仍然工作,因为我们现在的数据是从 Observable 中获取的,但是如果我们在 Observable 中指定长时间的操作 APP 将会停止运行,因为现在还在主线程中.

我们没有为 Observable 指定任何具体细节,它将运行默认的配置,代码会执行在调用时相同的线程.所以让我们配置我们的 Observable 在其它线程执行,但是在主线程中通知事件.

SubscribeOn

1
2
3
val subscription = newsManager.getNews()
.subscribeOn(Schedulers.io())
.subscribe (...)

这个方法 subscribeOn(…)允许你移动Observable 代码到其它线程,使用指定的行为.要做到这一点, RxJava 使用 Schedulers 提供了一个调度列表:

  • io: IO操作线程
  • computation:计算线程
  • newThread:为每一次工作创建一个新的线程
  • test:通常用来测试

在这种情况下,我们使用 Schedulers.io()来执行 API 请求

ObserveOn

ObserveOn 是另一个 Observable 类的方法,它允许你定义在哪里执行”subscribe(…)”方法.换句话说,它可以指定我们的 onNext和 onError 函数所在的线程.我们需要指定这些代码在主线程中执行.

在这里我们需要添加RxAndroid 依赖有一个新的调度器叫:

AndroidSchedulers.mainThread()

之后我们需要更新我们的代码,设置 observeOn 调度器

1
2
3
4
5
val subscription = newsManager.getNews()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe ( ...
)

Subscriptions

为了避免在订阅期间我们的 fragment 被关闭或者应用关闭,我们必须管理所有的订阅者,在关闭时对它们全部解绑.一个好的办法是创建一个 CompositeSubscription对象添加所有创建的订阅者,这个类通过 RxJava 提供,运行你通过调用一个方法解绑所有的观察者.我们在 onResume 中初始化 CompositeSubscription对象,在 onPause 方法中解绑.

1
2
3
4
5
6
7
8
9
10
11
var subscriptions = CompositeSubscription()
override fun onResume() {
super.onResume()
subscriptions = CompositeSubscription()
}
override fun onPause() {
super.onPause()
subscriptions.clear()
}

现在我们唯一要做的事情就是添加我们的订阅者到这个新的CompositeSubscription对象,在任何时候我们都可以离开我们的 App.

1
2
3
4
5
6
private fun requestNews() {
val subscription = newsManager.getNews()
.subscribeOn(Schedulers.io())
.subscribe (...)
subscriptions.add(subscription) // add the subscription
}

为了使代码更加可读,我将这些代码移动到一个新的基类” BaseFragment”中,使我们的 fragment 默认有这些逻辑.

总结

正如你看到的 Kotlin 和 RxJava 可以完美的集合再一起,在这个部分我们只是看到了一小部分,你可以看到 RxJava 与 Kotlin 集合,代码看起来更加可读和理解.