您好,Khabrovites。今天我们将讨论RxJava。我知道已经写了关于她的货车和小推车,但在我看来,我有几个有趣的观点值得分享。首先,我将告诉您我们如何将RxJava与VIPER架构一起用于Android应用程序,同时介绍使用它的“经典”方式。之后,让我们看一下RxJava的主要功能,并详细介绍如何安排调度程序。如果您已经准备好零食,那么欢迎来到猫下。
适合所有人的架构
RxJava是ReactiveX概念的实现,由Netflix创建。他们的博客上有一系列文章,介绍了为什么这样做以及解决了哪些问题。链接(1、2)可以在文章末尾找到。 Netflix在服务器端(后端)上使用RxJava来并行处理一个大请求。尽管他们提出了在后端使用RxJava的方法,但该体系结构适合于编写不同类型的应用程序(移动,桌面,后端以及许多其他应用程序)。 Netflix开发人员在服务层中使用RxJava的方式是,服务层的每个方法都返回一个Observable。关键是Observable中的元素可以同步和异步传递。这样,方法就可以自行决定是否立即立即立即返回该值(例如,如果在缓存中可用)或首先获取这些值(例如,从数据库或远程服务)并异步返回它们。无论如何,控件将在调用方法后立即返回(带有或不带有数据)。
/**
* , ,
* , ,
* callback `onNext()`
*/
public Observable<T> getProduct(String name) {
if (productInCache(name)) {
// ,
return Observable.create(observer -> {
observer.onNext(getProductFromCache(name));
observer.onComplete();
});
} else {
//
return Observable.<T>create(observer -> {
try {
//
T product = getProductFromRemoteService(name);
//
observer.onNext(product);
observer.onComplete();
} catch (Exception e) {
observer.onError(e);
}
})
// Observable IO
// /
.subscribeOn(Schedulers.io());
}
}
通过这种方法,我们为客户端(在本例中为控制器)和不同的实现获得了一个不变的API。客户端始终以相同的方式与Observable进行交互。是否同步接收值都没有关系。同时,API实现可以从同步更改为异步,而不会以任何方式影响与客户端的交互。使用这种方法,您完全不会考虑如何组织多线程,而只专注于业务任务的实现。
该方法不仅适用于后端的服务层,而且适用于MVC,MVP,MVVM等体系结构。例如,对于MVP,我们可以创建一个Interactor类,该类负责接收数据并将其保存到各种源,并进行所有操作其方法返回Observable。它们将是与Model交互的合同。这也将使Presenter能够利用RxJava中可用的运算符的全部功能。
我们可以更进一步,使Presenter成为反应式API,但是为此,我们需要正确实现取消订阅机制,该机制允许所有视图同时从Presenter取消订阅。
接下来,让我们看一下如何将此方法应用于VIPER体系结构的示例,VIPER体系结构是增强的MVP。还值得记住的是,您不能创建Observable单例对象,因为对此类Observable的订阅将产生内存泄漏。
拥有Android和VIPER的经验
在大多数当前和新的Android项目中,我们使用VIPER架构。当我加入已经使用过她的一个项目时,我遇到了她。我记得当我被问到是否正在寻找iOS时感到惊讶。我想:“ Android项目中的IOS吗?”同时,VIPER来自iOS世界,实际上是MVP的结构化和模块化版本。 VIPER在本文中写得很好(3)。
一开始,一切似乎都很好:正确地划分了层,而不是重载层,每一层都有自己的责任区,逻辑清晰。但是一段时间之后,一个弊端开始出现,随着项目的发展和变化,它甚至开始产生干扰。
事实是,我们与文章中的同事使用Interactor的方式相同。 Interactor实现了一个小用例,例如“从网络下载产品”或“按id从数据库中获取产品”,并在工作流中执行操作。在内部,交互器使用Observable执行操作。为了“运行” Interactor并获得结果,用户实现ObserverEntity接口及其onNext,onError和onComplete方法,并将其与参数一起传递给execute(params,ObserverEntity)方法。
您可能已经注意到了问题-接口的结构。实际上,我们很少需要这三种方法,通常只使用其中一种或两种。因此,空方法可能会出现在您的代码中。当然,我们可以将接口的所有方法都标记为默认方法,但是需要这样的方法才能向接口添加新功能。另外,有一个接口,其中所有方法都是可选的,这很奇怪。例如,我们还可以创建一个继承接口的抽象类,并覆盖所需的方法。或者,最后,创建可以接受一到三个功能接口的execute(params,ObserverEntity)方法的重载版本。这个问题不利于代码的可读性,但是幸运的是,它很容易解决。但是,她并不是唯一的一个。
saveProductInteractor.execute(product, new ObserverEntity<Void>() {
@Override
public void onNext(Void aVoid) {
// ,
//
}
@Override
public void onError(Throwable throwable) {
//
// -
}
@Override
public void onComplete() {
//
// -
}
});
除了空方法,还有一个更烦人的问题。我们使用Interactor来执行某些操作,但是几乎总是这种操作不是唯一的。例如,我们可以从数据库中获取产品,然后获取评论和有关该产品的图片,然后将其全部保存到另一个位置,最后转到另一个屏幕。在这里,每个动作都取决于前一个动作,并且在使用Interactors时,我们会获得大量的回调链,这些回调链非常麻烦。
private void checkProduct(int id, Locale locale) {
getProductByIdInteractor.execute(new TypesUtil.Pair<>(id, locale), new ObserverEntity<Product>() {
@Override
public void onNext(Product product) {
getProductInfo(product);
}
@Override
public void onError(Throwable throwable) {
// -
}
@Override
public void onComplete() {
}
});
}
private void getProductInfo(Product product) {
getReviewsByProductIdInteractor.execute(product.getId(), new ObserverEntity<List<Review>>() {
@Override
public void onNext(List<Review> reviews) {
product.setReviews(reviews);
saveProduct(productInfo);
}
@Override
public void onError(Throwable throwable) {
// -
}
@Override
public void onComplete() {
// -
}
});
getImageForProductInteractor.execute(product.getId(), new ObserverEntity<Image>() {
@Override
public void onNext(Image image) {
product.setImage(image);
saveProduct(product);
}
@Override
public void onError(Throwable throwable) {
// -
}
@Override
public void onComplete() {
}
});
}
private void saveProduct(Product product) {
saveProductInteractor.execute(product, new ObserverEntity<Void>() {
@Override
public void onNext(Void aVoid) {
}
@Override
public void onError(Throwable throwable) {
// -
}
@Override
public void onComplete() {
goToSomeScreen();
}
});
}
那么,您如何喜欢这种通心粉?同时,我们拥有简单的业务逻辑和单一嵌套,但请想象一下,如果使用更复杂的代码会发生什么。这也使得难以重用该方法并对Interactor应用不同的调度程序。
解决方案非常简单。您是否感觉这种方法试图模仿Observable的行为,但是这样做是错误的,并且会自身产生怪异的约束?正如我之前说过的,我们是从一个现有项目中获得此代码的。修复此旧代码时,我们将使用Netflix员工遗赠给我们的方法。不必每次都实现一个ObserverEntity,而让Interactor只需返回一个Observable即可。
private Observable<Product> getProductById(int id, Locale locale) {
return getProductByIdInteractor.execute(new TypesUtil.Pair<>(id, locale));
}
private Observable<Product> getProductInfo(Product product) {
return getReviewsByProductIdInteractor.execute(product.getId())
.map(reviews -> {
product.set(reviews);
return product;
})
.flatMap(product -> {
getImageForProductInteractor.execute(product.getId())
.map(image -> {
product.set(image);
return product;
})
});
}
private Observable<Product> saveProduct(Product product) {
return saveProductInteractor.execute(product);
}
private doAll(int id, Locale locale) {
//
getProductById (id, locale)
//
.flatMap(product -> getProductInfo(product))
//
.flatMap(product -> saveProduct(product))
//
.ignoreElements()
//
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
//
.subscribe(() -> goToSomeScreen(), throwable -> handleError());
}
瞧!因此,我们不仅摆脱了繁琐而笨拙的恐怖,而且还将RxJava的功能带给了Presenter。
核心概念
我经常看到他们如何使用功能性反应式编程(以下简称FRP)来解释RxJava的概念。实际上,它与该库无关。FRP更多地是关于连续动态改变值(行为),连续时间和指称语义。在本文的结尾,您可以找到几个有趣的链接(4、5、6、7)。
RxJava使用反应式编程和函数式编程作为其核心概念。响应式编程可以描述为从观察对象到观察者对象的信息顺序传输,以使观察者对象在出现此信息时自动(异步)接收信息。
函数式编程使用纯函数的概念,即不使用或不更改外部状态的函数;他们完全依靠他们的投入来获得他们的产出。纯函数没有副作用,因此可以将一个函数的结果用作另一个函数的输入参数。这使得可以构成无限的功能链。
将这两个概念与GoF的Observer和Iterator模式结合在一起,就可以创建异步数据流,并使用大量非常有用的功能对其进行处理。它还使非常简单且最重要的是安全地使用多线程成为可能,而无需考虑诸如同步,内存不一致,线程重叠等问题。
RxJava的三头鲸
构建RxJava的主要三个组件是Observable,运算符和调度程序。RxJava中的
Observable负责实现反应式范例。可观察对象通常被称为流,因为它们既实现了数据流的概念又传播了变化。Observable是一种通过组合来自“四人帮”的两种模式来实现反应性范式实现的类型:Observer和Iterator。Observable为Observer添加了两个缺少的语义,它们在Iterable中:
- 生产者向消费者发出信号,通知它不再有可用数据的能力(Iterable端的foreach循环并返回;在这种情况下,Observable调用onCompleate方法)。
- 生产者通知消费者发生错误并且Observable不再发出元素的能力(如果在迭代过程中发生错误,则Iterable引发异常; Observable对其观察者调用onError并退出)。
如果Iterable使用“拉”方法,也就是说,消费者从生产者请求一个值,并且线程阻塞直到该值到达,则Observable就是其“推”等效项。这意味着生产者仅在价值可用时才将其发送给消费者。
可观察只是RxJava的开始。它允许您异步获取值,但真正的强大功能来自于“ reactive extensions”(因此ReactiveX)-运算符它允许您转换,合并和创建Observable发出的元素序列。这就是功能范式凭借其纯功能而脱颖而出的地方。运营商充分利用了这一概念。它们使您可以安全地处理Observable发出的元素序列,而不必担心副作用,除非您自己创建它们。操作员允许多线程,而不必担心诸如线程安全性,低级线程控制,同步,内存不一致错误,线程覆盖等问题。拥有大量功能,您可以轻松处理各种数据。这为我们提供了非常强大的工具。要记住的主要事情是,运算符修改了Observable发出的项目,而不是Observable本身。自创建以来,可观察对象就永远不会改变。考虑线程和运算符时,最好考虑一下图表。如果您不知道如何解决问题,请考虑一下可用操作员的整个列表,然后再考虑。
尽管反应式编程的概念本身是异步的(不要与多线程混淆),但默认情况下,Observable中的所有项都在调用subscribe()方法的同一线程上同步传递给订阅服务器。要引入相同的异步,您需要在另一个执行线程中自己调用onNext(T),onError(Throwable),onComplete()方法,或使用调度程序。通常每个人都分析他们的行为,所以让我们看一下他们的结构。
策划者将用户从他们自己的API背后的并行化源中抽象出来。它们保证它们将提供特定的属性,而与底层的并发机制(实现)无关,例如线程,事件循环或执行器。调度程序使用守护程序线程。这意味着,即使在Observable运算符内部进行了一些计算,程序也会随着执行主线程的终止而终止。
RxJava有几个适用于特定目的的标准调度程序。它们都扩展了抽象的Scheduler类,并实现了自己的用于管理工作人员的逻辑。例如,在创建ComputationScheduler时,它会形成一个工作池,其数量等于处理器线程的数量。然后,ComputationScheduler使用工作程序执行可运行任务。您可以使用scheduleDirect()和schedulePeriodicallyDirect()方法将Runnable传递给调度程序。对于这两种方法,调度程序都会从池中取出下一个工作程序,并将Runnable传递给它。
工作程序位于调度程序内部,并且是一个使用几种并发方案之一执行Runnable对象(任务)的实体。换句话说,调度程序接收Runnable并将其传递给工作程序以执行。您还可以独立地从调度程序中获取一个工作程序,并将一个或多个Runnable转移给他,而与其他工作程序和调度程序本身无关。工人收到任务时,会将其放入队列。工作人员保证按提交顺序依次执行任务,但是挂起的任务可能会干扰顺序。例如,在ComputationScheduler中,使用单线程ScheduledExecutorService实现工作程序。
因此,我们有可以实现任何并行方案的抽象工作者。这种方法具有许多优点:模块化,灵活性,一个API,不同的实现。我们在ExecutorService中看到了类似的方法。另外,我们可以将调度程序与Observable分开使用。
结论
RxJava是一个非常强大的库,可以在许多体系结构中以多种方式使用。使用它的方法不仅限于现有方法,因此请始终尝试使其适应自己。但是,请记住有关SOLID,DRY和其他设计原则的知识,并且不要忘记与同事分享您的经验。希望您能够从本文中学到一些有趣的新东西,再见!