草庐IT

手写RxJava简易框架领悟RxJava的美秒

天上飘的是浮云 2023-04-09 原文

RxJava笔记

前言

看此篇之前最好知道RxJava的使用。由于RxJava内部源码实现有点复杂,既然用拆轮子的方式来分析源码比较难啃,不如换种方式,以造轮子的方式,将源码中与性能、兼容性、扩展性有关的代码剔除,只留下核心代码,加上我个人的理解,带大家揭秘RxJava的实现原理(本文不涉及框架的使用介绍)。

一、构建观察者类

Subsribler在RxJava里面是一个抽象类,它实现了Observer接口。

public interface Observer<T> {

    void onCompleted();

    void onError(Throwable throwable);

    void onNext(T value);
}

public abstract class Subscriber<T> implements Observer<T>{

    public void onStart(){

    }
}

二、构建被观察者

Observable(被观察者)拥有很多工厂方法和各式各样的操作符。每个Observable里面都维护了一个OnSubscribe对象,并通过subscribe()里面的call(Subscriber<? super T> subscriber)方法与观察者产生联系。

public class Observable<T> {

    final OnSubscribe<T> onSubscribe;

    private  Observable(OnSubscribe<T> onSubscribe){
        this.onSubscribe = onSubscribe;
    }

    public static <T> Observable<T> create(OnSubscribe<T> onSubscribe){
        return new Observable<T>(onSubscribe);
    }

    public void subscribe(Subscriber<T> subscriber){
        subscriber.onStart();
        onSubscribe.call(subscriber);
    }

    public interface OnSubscribe<T>{
        void call(Subscriber<? super T> subscriber);
    }
}

三、RxJava的事件流雏形产生

通过上面写的观察者和被观察者,即可写出一个没有操作符和线程切换功能的简易版Rxjava。

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for(int i = 0; i < 10; i++){
                    subscriber.onNext(i);
                }
            }
        }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onNext(Integer value) {
                System.out.println("Result: "+value);
            }
        });

通过Observable.create将OnSubscribe的匿名类传给Observable,在subscribe()时回调OnSubscribe接口中的call方法,同时call方法参数即为subscribe的参数,即观察者,因此继续回调subscriber.onNext()即可完成观察者里的逻辑。

结果如下:

image.png

四、玩转RxJava里的操作符

RxJava之所以强大好用,与其拥有丰富灵活的操作符是分不开的。那么我们就试着为这个框架添加一个最常用的操作符:map。先看代码:

    public <R> Observable<R> map(final Fun1<T, R> transformer){
        return create(new OnSubscribe<R>() {
            @Override
            public void call(final Subscriber<? super R> subscriber) {
                Observable.this.onSubscribe.call(new Subscriber<T>() {
                    @Override
                    public void onCompleted() {
                        subscriber.onCompleted();
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        subscriber.onError(throwable);
                    }

                    @Override
                    public void onNext(T value) {
                        subscriber.onNext(transformer.transfer(value));
                    }
                });
            }
        });
    }

    public interface Fun1<T, R>{
        R transfer(T from);
    }

测试代码

 Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for(int i = 0; i < 10; i++){
                    subscriber.onNext(i);
                }
            }
        }).map(new Observable.Fun1<Integer, String>() {
             @Override
             public String transfer(Integer from) {
                 return String.valueOf(from)+"_Map";
             }
         }
        ).subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onNext(String value) {
                System.out.println("Result: "+value);
            }
        });

结果如下:


image.png
  • 其实RxJava每调用一次操作符的方法,就相当于在上层数据源和下层观察者之间桥接了一个新的Observable。桥接的Observable内部会实例化新的OnSuscribe和Subscriber。

  • 新建的OnSuscribe的call方法负责持有目标Subscriber,此时就可以回调subscriber的方法来完成观察的行为了。但是这是还没有数据源,想要获得数据源必须调用源Observable.OnSubscribe的subscribe方法,传入一个新的Subscriber,这样就可以在它的onNext()方法中获得数据源,并经过传入的接口处理后,发送给最终的Subscriber。

总体来说就是源Observable.OnSubscribe将Event往下发送给桥接Observable.Subscriber,最终桥接Observable.Subscriber将Event做相应处理后转发给目标Subscriber。

五、RxJava里的线程切换

RxJava中最激动人心的功能是异步处理,能够自如地切换线程。

利用subscribeOn() 结合observeOn() 来实现线程控制,让事件的产生和消费发生在不同的线程。 observeOn() 可以多次调用,Subscriber的执行线程与最后一次observeOn()的调用有关。但subscribeOn() 多次调用只有第一个subscribeOn() 起作用。

这是因为 observeOn() 作用的是Subscriber,而subscribeOn() 作用的是OnSubscribe,这时事件还没开始发送,因此subscribeOn()的线程控制可以从事件发出的开端就造成影响。

线程调度除了桥接Observable以外,RxJava还用到一个很关键的类Scheduler(调度器)。

5.1 Scheduler核心代码如下:
public class Scheduler {
    private final static Scheduler ioScheduler
            = new Scheduler(Executors.newSingleThreadExecutor());

    Executor executor;

    public Scheduler(Executor executor){
        this.executor = executor;
    }

    public Worker createWorker(){
        return new Worker(executor);
    }

    public static class Worker {
        Executor executor1;
        public Worker(Executor executor1){
            this.executor1 = executor1;
        }

        public void schedule(Runnable runnable){
            executor1.execute(runnable);
        }
    }

    public static Scheduler io(){
        return ioScheduler;
    }
}

具体的Scheduler的实现类就不看了,但我们需要知道,能做到线程切换的关键是Worker的schedule方法,因为它会把传过来的任务放入线程池,并在新线程中执行。

5.2 实现observeOn

observeOn是作用于下层Subscriber的,需要让下层Subscriber的事件处理方法放到新线程中执行。为此,在Observable类里面,添加如下代码:

public Observable<T> observeOn(final Scheduler scheduler){
        return create(new OnSubscribe<T>() {
            @Override
            public void call(final Subscriber<? super T> subscriber) {
                subscriber.onStart();
                final Scheduler.Worker worker = scheduler.createWorker();
                Observable.this.onSubscribe.call(new Subscriber<T>() {
                    @Override
                    public void onCompleted() {
                    }

                    @Override
                    public void onError(Throwable throwable) {
                    }

                    @Override
                    public void onNext(final T value) {
                        worker.schedule(new Runnable() {
                            @Override
                            public void run() {
                                subscriber.onNext(value);
                            }
                        });
                    }
                });
            }
        });
    }

测试代码如下:

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for(int i = 0; i < 10; i++){
                    subscriber.onNext(i);
                }
            }
        }).map(new Observable.Fun1<Integer, String>() {
                   @Override
                   public String transfer(Integer from) {
                       return String.valueOf(from)+"_Map";
                   }
               }
        ).observeOn(Scheduler.io()).subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onNext(String value) {
                System.out.println("Result: "+Thread.currentThread().getName());
            }
        });

结果如下:

image.png
5.3 实现subscribeOn

subscribeOn是作用于上层OnSubscribe的,可以让OnSubscribe的call方法在新线程中执行。

因此,在Observable类里面,添加如下代码:

public Observable<T> subscribeOn(final Scheduler scheduler){
        return create(new OnSubscribe<T>() {
            @Override
            public void call(final Subscriber<? super T> subscriber) {
                scheduler.createWorker().schedule(new Runnable() {
                    @Override
                    public void run() {
                        Observable.this.onSubscribe.call(subscriber);
                    }
                });
            }
        });
    }

测试代码如下:

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                System.out.println("Observable thread: "+Thread.currentThread().getName());
                for(int i = 0; i < 10; i++){
                    subscriber.onNext(i);
                }
            }
        }).map(new Observable.Fun1<Integer, String>() {
                   @Override
                   public String transfer(Integer from) {
                       System.out.println("Map Observable thread: "+Thread.currentThread().getName());
                       return String.valueOf(from)+"_Map";
                   }
               }
        ).observeOn(Scheduler.io()).subscribeOn(Scheduler.io()).subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onNext(String value) {
//                System.out.println("Result: "+Thread.currentThread().getName());
            }
        });

结果如下:


image.png

六、总结

相信看RxJava这个简易版的设计对大家的启示,比网上的一些源码解析清晰的多,希望可以抛砖引玉。有时候我们总是认为看几篇博文貌似当时就懂了明白了,但是这种理解或者说记忆貌似不持久。过了一段时间总是还给博主了。学习还是得深入源码,从源码中学习,然后在结合其他人的博客查漏补缺,这样才是自己的东西。大家有兴趣可以把flatMap等其他操作符来自己实现一下。

有关手写RxJava简易框架领悟RxJava的美秒的更多相关文章

  1. 基于C#实现简易绘图工具【100010177】 - 2

    C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.

  2. TimeSformer:抛弃CNN的Transformer视频理解框架 - 2

    Transformers开始在视频识别领域的“猪突猛进”,各种改进和魔改层出不穷。由此作者将开启VideoTransformer系列的讲解,本篇主要介绍了FBAI团队的TimeSformer,这也是第一篇使用纯Transformer结构在视频识别上的文章。如果觉得有用,就请点赞、收藏、关注!paper:https://arxiv.org/abs/2102.05095code(offical):https://github.com/facebookresearch/TimeSformeraccept:ICML2021author:FacebookAI一、前言Transformers(VIT)在图

  3. ruby - sinatra 框架的 MVC 模式 - 2

    我想开始使用“Sinatra”框架进行编码,但我找不到该框架的“MVC”模式。是“MVC-Sinatra”模式或框架吗? 最佳答案 您可能想查看Padrino这是一个围绕Sinatra构建的框架,可为您的项目提供更“类似Rails”的感觉,但没有那么多隐藏的魔法。这是使用Sinatra可以做什么的一个很好的例子。虽然如果您需要开始使用这很好,但我个人建议您将它用作学习工具,以对您来说最有意义的方式使用Sinatra构建您自己的应用程序。写一些测试/期望,写一些代码,通过测试-重复:)至于ORM,你还应该结帐Sequel其中(imho

  4. ruby-on-rails - 正确了解 Rails 框架的最佳方式是什么? - 2

    按照目前的情况,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visitthehelpcenter指导。关闭10年前。我一直在Rails上做两个项目,它们运行良好,但在这个过程中重新发明了轮子,自来水(和热水)和止痛药,正如我随后了解到的那样,这些已经存在于框架中。那么基本上,正确了解框架中所有智能部分的最佳方法是什么,这将节省时间而不是自己构建已经实现的功能?从第1页开始阅读文档?是否有公开所有内容的特定示例应用程序?一个特定的开源项目?所有的rails交通?还是完全

  5. ruby - 自动将院子文档框架添加到现有的 Rails 遗留代码中 - 2

    关闭。这个问题不符合StackOverflowguidelines.它目前不接受答案。我们不允许提问寻求书籍、工具、软件库等的推荐。您可以编辑问题,以便用事实和引用来回答。关闭4年前。Improvethisquestion我希望能够将模板化的YARD文档样式注释插入到我现有的Rails遗留应用程序中。目前它的评论很少。我想要具有指定参数的类header和方法header(通过从我假定的方法签名中提取)和返回值的占位符。在PHP代码中,我有一些工具可以检查代码并在适当的位置创建插入到代码中的文档header注释。在带有Ducktyping等的Ruby中,我确信诸如@params等类型之类

  6. ruby-on-rails - 具有六边形架构和 DCI 模式的框架和数据库适配器 - 2

    我尝试用Ruby设计一个基于Web的应用程序。我开发了一个简单的核心应用程序,在没有框架和数据库的情况下在六边形架构中实现DCI范例。核心六边形中有小六边形和网络,数据库,日志等适配器。每个六边形都在没有数据库和框架的情况下自行运行。在这种方法中,我如何提供与数据库模型和实体类的关系作为独立于数据库的关系。我想在将来将框架从Rails更改为Sinatra或数据库。事实上,我如何在这个核心Hexagon中实现完全隔离的rails和mongodb的数据库适配器或框架适配器。有什么想法吗? 最佳答案 ROM呢?(Ruby对象映射器)。还有

  7. python - Ruby 是否有相当于 Python 的扭曲框架作为网络抽象层? - 2

    据我了解,Python的扭曲框架为网络通信提供了更高级别的抽象(?)。我正在寻找在Rails应用程序中使用与twisted等效的Ruby。 最佳答案 看看EventMachine.它不像Twisted那样广泛,但它是围绕事件驱动网络编程的相同概念构建的。 关于python-Ruby是否有相当于Python的扭曲框架作为网络抽象层?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/9

  8. ruby-on-rails - 使用 Rails 以外的 Ruby 框架是否有任何潜在的缺点? - 2

    我想使用比Rails(Sinatra/Ramaze/Camping)更轻的框架,但我担心这样做我将无法使用许多以插件形式为Rails定制的共享库.这是一个主要问题,还是这些插件中的大多数都可以跨不同的Ruby框架使用?使用Ruby框架而不是Rails是否还有其他潜在的缺点? 最佳答案 您仍然可以使用gems在你提到的所有框架中,很多东西都是可重用的。想要交换一个新的ORM,没问题。想要一个花哨的shmacy语法高亮,没问题。Rails一直在大力插入摆脱旧的插件模型,转而使用gems。如果其他框架之一符合您的需求,最好使用它。请记住,

  9. ruby - 应该 validate_format_of 。 not_with 在框架中有问题(或者在我的理解中) - 2

    我将以下代码放入RSpec测试中:it{shouldvalidate_format_of(:email).not_with('test@test')}并设置实际的类:validates:email,:presence=>true,:format=>/\b[A-Z0-9._%-]+@(?:[A-Z0-9-]+\.)+[A-Z]{2,4}\b/i当我运行测试时,我得到:失败:1)用户失败/错误:它{应该validate_format_of(:email).not_with('test@test')}当电子邮件设置为“test@test”时,预期错误包括“can'tbeblank”,得到错误

  10. ruby-on-rails - Rails 使用了哪些测试框架? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭9年前。ImprovethisquestionRails使用了哪些单元测试框架?我正在阅读一本书(PragmaticProgrammersAgileDev.withRails),其中展示了如何在Rails中进行单元测试。这本书向我展示了默认的Rails测试套件(Test::Unit的子类)。这是Rails社区中使用的主要测试框架吗?我在执行常规ruby​​时使用RSpec,我也希望能够在Rails中使用它(如果不是太麻烦的话)?

随机推荐