如何通过Android中的VIPER与RxJava成为朋友,应用方法和调度程序的结构

图片



您好,Khabrovites。今天我们将讨论RxJava。我知道已经写了关于她的货车和小推车,但在我看来,我有几个有趣的观点值得分享。首先,我将告诉您我们如何将RxJava与VIPER架构一起用于Android应用程序,同时介绍使用它的“经典”方式。之后,让我们看一下RxJava的主要功能,并详细介绍如何安排调度程序。如果您已经准备好零食,那么欢迎来到猫下。



适合所有人的架构



RxJava是ReactiveX概念的实现,由Netflix创建。他们的博客上有一系列文章,介绍了为什么这样做以及解决了哪些问题。链接(1、2)可以在文章末尾找到。 Netflix在服务器端(后端)上使用RxJava来并行处理一个大请求。尽管他们提出了在后端使用RxJava的方法,但该体系结构适合于编写不同类型的应用程序(移动,桌面,后端以及许多其他应用程序)。 Netflix开发人员在服务层中使用RxJava的方式是,服务层的每个方法都返回一个Observable。关键是Observable中的元素可以同步和异步传递。这样,方法就可以自行决定是否立即立即立即返回该值(例如,如果在缓存中可用)或首先获取这些值(例如,从数据库或远程服务)并异步返回它们。无论如何,控件将在调用方法后立即返回(带有或不带有数据)。



/**
 * ,    ,  
 * ,      ,
 *        callback `onNext()`
 */
public Observable<T> getProduct(String name) {
    if (productInCache(name)) {
        //   ,   
        return Observable.create(observer -> {
           observer.onNext(getProductFromCache(name));
           observer.onComplete();
        });
    } else {
        //     
        return Observable.<T>create(observer -> {
            try {
                //     
                T product = getProductFromRemoteService(name);
                //  
                observer.onNext(product);
                observer.onComplete();
            } catch (Exception e) {
                observer.onError(e);
            }
        })
        //  Observable   IO
        //  / 
        .subscribeOn(Schedulers.io());
    }
}


通过这种方法,我们为客户端(在本例中为控制器)和不同的实现获得了一个不变的API。客户端始终以相同的方式与Observable进行交互。是否同步接收值都没有关系。同时,API实现可以从同步更改为异步,而不会以任何方式影响与客户端的交互。使用这种方法,您完全不会考虑如何组织多线程,而只专注于业务任务的实现。



该方法不仅适用于后端的服务层,而且适用于MVC,MVP,MVVM等体系结构。例如,对于MVP,我们可以创建一个Interactor类,该类负责接收数据并将其保存到各种源,并进行所有操作其方法返回Observable。它们将是与Model交互的合同。这也将使Presenter能够利用RxJava中可用的运算符的全部功能。



图片



我们可以更进一步,使Presenter成为反应式API,但是为此,我们需要正确实现取消订阅机制,该机制允许所有视图同时从Presenter取消订阅。



接下来,让我们看一下如何将此方法应用于VIPER体系结构的示例,VIPER体系结构是增强的MVP。还值得记住的是,您不能创建Observable单例对象,因为对此类Observable的订阅将产生内存泄漏。



拥有Android和VIPER的经验



在大多数当前和新的Android项目中,我们使用VIPER架构。当我加入已经使用过她的一个项目时,我遇到了她。我记得当我被问到是否正在寻找iOS时感到惊讶。我想:“ Android项目中的IOS吗?”同时,VIPER来自iOS世界,实际上是MVP的结构化和模块化版本。 VIPER在本文中写得很好(3)。



一开始,一切似乎都很好:正确地划分了层,而不是重载层,每一层都有自己的责任区,逻辑清晰。但是一段时间之后,一个弊端开始出现,随着项目的发展和变化,它甚至开始产生干扰。



事实是,我们与文章中的同事使用Interactor的方式相同。 Interactor实现了一个小用例,例如“从网络下载产品”或“按id从数据库中获取产品”,并在工作流中执行操作。在内部,交互器使用Observable执行操作。为了“运行” Interactor并获得结果,用户实现ObserverEntity接口及其onNext,onError和onComplete方法,并将其与参数一起传递给execute(params,ObserverEntity)方法。



您可能已经注意到了问题-接口的结构。实际上,我们很少需要这三种方法,通常只使用其中一种或两种。因此,空方法可能会出现在您的代码中。当然,我们可以将接口的所有方法都标记为默认方法,但是需要这样的方法才能向接口添加新功能。另外,有一个接口,其中所有方法都是可选的,这很奇怪。例如,我们还可以创建一个继承接口的抽象类,并覆盖所需的方法。或者,最后,创建可以接受一到三个功能接口的execute(params,ObserverEntity)方法的重载版本。这个问题不利于代码的可读性,但是幸运的是,它很容易解决。但是,她并不是唯一的一个。



saveProductInteractor.execute(product, new ObserverEntity<Void>() {
    @Override
    public void onNext(Void aVoid) {
        //      ,
        //     
    }

    @Override
    public void onError(Throwable throwable) {
        //    
        // - 
    }

    @Override
    public void onComplete() {
        //     
        // - 
    }
});


除了空方法,还有一个更烦人的问题。我们使用Interactor来执行某些操作,但是几乎总是这种操作不是唯一的。例如,我们可以从数据库中获取产品,然后获取评论和有关该产品的图片,然后将其全部保存到另一个位置,最后转到另一个屏幕。在这里,每个动作都取决于前一个动作,并且在使用Interactors时,我们会获得大量的回调链,这些回调链非常麻烦。



private void checkProduct(int id, Locale locale) {
    getProductByIdInteractor.execute(new TypesUtil.Pair<>(id, locale), new ObserverEntity<Product>() {
        @Override
        public void onNext(Product product) {
            getProductInfo(product);
        }

        @Override
        public void onError(Throwable throwable) {
            // - 
        }

        @Override
        public void onComplete() {
        }
    });
}

private void getProductInfo(Product product) {
    getReviewsByProductIdInteractor.execute(product.getId(), new ObserverEntity<List<Review>>() {
        @Override
        public void onNext(List<Review> reviews) {
            product.setReviews(reviews);
            saveProduct(productInfo);
        }

        @Override
        public void onError(Throwable throwable) {
            // - 
        }

        @Override
        public void onComplete() {
            // - 
        }
    });
    getImageForProductInteractor.execute(product.getId(), new ObserverEntity<Image>() {
        @Override
        public void onNext(Image image) {
            product.setImage(image);
            saveProduct(product);
        }

        @Override
        public void onError(Throwable throwable) {
            // - 
        }

        @Override
        public void onComplete() {
        }
    });
}

private void saveProduct(Product product) {
    saveProductInteractor.execute(product, new ObserverEntity<Void>() {
        @Override
        public void onNext(Void aVoid) {
        }

        @Override
        public void onError(Throwable throwable) {
            // - 
        }

        @Override
        public void onComplete() {
            goToSomeScreen();
        }
    });
}


那么,您如何喜欢这种通心粉?同时,我们拥有简单的业务逻辑和单一嵌套,但请想象一下,如果使用更复杂的代码会发生什么。这也使得难以重用该方法并对Interactor应用不同的调度程序。



解决方案非常简单。您是否感觉这种方法试图模仿Observable的行为,但是这样做是错误的,并且会自身产生怪异的约束?正如我之前说过的,我们是从一个现有项目中获得此代码的。修复此旧代码时,我们将使用Netflix员工遗赠给我们的方法。不必每次都实现一个ObserverEntity,而让Interactor只需返回一个Observable即可。



private Observable<Product> getProductById(int id, Locale locale) {
    return getProductByIdInteractor.execute(new TypesUtil.Pair<>(id, locale));
}

private Observable<Product> getProductInfo(Product product) {
    return getReviewsByProductIdInteractor.execute(product.getId())
    .map(reviews -> {
        product.set(reviews);
        return product;
    })
    .flatMap(product -> {
        getImageForProductInteractor.execute(product.getId())
        .map(image -> {
            product.set(image);
            return product;
        })
    });
}

private Observable<Product> saveProduct(Product product) {
    return saveProductInteractor.execute(product);
}

private doAll(int id, Locale locale) {
    //    
    getProductById (id, locale)
    //  
    .flatMap(product -> getProductInfo(product))
    //     
    .flatMap(product -> saveProduct(product))
    //        
    .ignoreElements()
    //  
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    //    
    .subscribe(() -> goToSomeScreen(), throwable -> handleError());
}


瞧!因此,我们不仅摆脱了繁琐而笨拙的恐怖,而且还将RxJava的功能带给了Presenter。



核心概念



我经常看到他们如何使用功能性反应式编程(以下简称FRP)来解释RxJava的概念。实际上,它与该库无关。FRP更多地是关于连续动态改变值(行为),连续时间和指称语义。在本文的结尾,您可以找到几个有趣的链接(4、5、6、7)。



RxJava使用反应式编程和函数式编程作为其核心概念。响应式编程可以描述为从观察对象到观察者对象的信息顺序传输,以使观察者对象在出现此信息时自动(异步)接收信息。



函数式编程使用纯函数的概念,即不使用或不更改外部状态的函数;他们完全依靠他们的投入来获得他们的产出。纯函数没有副作用,因此可以将一个函数的结果用作另一个函数的输入参数。这使得可以构成无限的功能链。



将这两个概念与GoF的Observer和Iterator模式结合在一起,就可以创建异步数据流,并使用大量非常有用的功能对其进行处理。它还使非常简单且最重要的是安全地使用多线程成为可能,而无需考虑诸如同步,内存不一致,线程重叠等问题。



图片



RxJava的三头鲸



构建RxJava的主要三个组件是Observable,运算符和调度程序。RxJava中的

Observable负责实现反应式范例。可观察对象通常被称为流,因为它们既实现了数据流的概念又传播了变化。Observable是一种通过组合来自“四人帮”的两种模式来实现反应性范式实现的类型:Observer和Iterator。Observable为Observer添加了两个缺少的语义,它们在Iterable中:

  • 生产者向消费者发出信号,通知它不再有可用数据的能力(Iterable端的foreach循环并返回;在这种情况下,Observable调用onCompleate方法)。
  • 生产者通知消费者发生错误并且Observable不再发出元素的能力(如果在迭代过程中发生错误,则Iterable引发异常; Observable对其观察者调用onError并退出)。


如果Iterable使用“拉”方法,也就是说,消费者从生产者请求一个值,并且线程阻塞直到该值到达,则Observable就是其“推”等效项。这意味着生产者仅在价值可用时才将其发送给消费者。



可观察只是RxJava的开始。它允许您异步获取值,但真正的强大功能来自于“ reactive extensions”(因此ReactiveX)-运算符它允许您转换,合并和创建Observable发出的元素序列。这就是功能范式凭借其纯功能而脱颖而出的地方。运营商充分利用了这一概念。它们使您可以安全地处理Observable发出的元素序列,而不必担心副作用,除非您自己创建它们。操作员允许多线程,而不必担心诸如线程安全性,低级线程控制,同步,内存不一致错误,线程覆盖等问题。拥有大量功能,您可以轻松处理各种数据。这为我们提供了非常强大的工具。要记住的主要事情是,运算符修改了Observable发出的项目,而不是Observable本身。自创建以来,可观察对象就永远不会改变。考虑线程和运算符时,最好考虑一下图表。如果您不知道如何解决问题,请考虑一下可用操作员的整个列表,然后再考虑。



尽管反应式编程的概念本身是异步的(不要与多线程混淆),但默认情况下,Observable中的所有项都在调用subscribe()方法的同一线程上同步传递给订阅服务器。要引入相同的异步,您需要在另一个执行线程中自己调用onNext(T),onError(Throwable),onComplete()方法,或使用调度程序。通常每个人都分析他们的行为,所以让我们看一下他们的结构。



策划者将用户从他们自己的API背后的并行化源中抽象出来。它们保证它们将提供特定的属性,而与底层的并发机制(实现)无关,例如线程,事件循环或执行器。调度程序使用守护程序线程。这意味着,即使在Observable运算符内部进行了一些计算,程序也会随着执行主线程的终止而终止。



RxJava有几个适用于特定目的的标准调度程序。它们都扩展了抽象的Scheduler类,并实现了自己的用于管理工作人员的逻辑。例如,在创建ComputationScheduler时,它会形成一个工作池,其数量等于处理器线程的数量。然后,ComputationScheduler使用工作程序执行可运行任务。您可以使用scheduleDirect()和schedulePeriodicallyDirect()方法将Runnable传递给调度程序。对于这两种方法,调度程序都会从​​池中取出下一个工作程序,并将Runnable传递给它。



工作程序位于调度程序内部,并且是一个使用几种并发方案之一执行Runnable对象(任务)的实体。换句话说,调度程序接收Runnable并将其传递给工作程序以执行。您还可以独立地从调度程序中获取一个工作程序,并将一个或多个Runnable转移给他,而与其他工作程序和调度程序本身无关。工人收到任务时,会将其放入队列。工作人员保证按提交顺序依次执行任务,但是挂起的任务可能会干扰顺序。例如,在ComputationScheduler中,使用单线程ScheduledExecutorService实现工作程序。



图片



因此,我们有可以实现任何并行方案的抽象工作者。这种方法具有许多优点:模块化,灵活性,一个API,不同的实现。我们在ExecutorService中看到了类似的方法。另外,我们可以将调度程序与Observable分开使用。



结论



RxJava是一个非常强大的库,可以在许多体系结构中以多种方式使用。使用它的方法不仅限于现有方法,因此请始终尝试使其适应自己。但是,请记住有关SOLID,DRY和其他设计原则的知识,并且不要忘记与同事分享您的经验。希望您能够从本文中学到一些有趣的新东西,再见!



  1. Netflix开始使用ReactiveX的原因
  2. RxJava向Internet社区的演示
  3. VIPER
  4. Conal Elliot



All Articles