草庐IT

Reactive编程思想

右耳菌 2023-10-12 原文

1. JDK9 Reactive - (真的要使用,建议使用jdk11)

Reactive响应式(反应式)编程是一种新的编程风格,其特点是异步或并发、事件驱动、推送PUSH机制以及观察者模式的衍生。reactive应用(响应式应用)允许开发人员构建事件驱动(event-driven),可扩展性,弹性的反应系统∶提供高度敏感的实时的用户体验感觉,可伸缩性和弹性的应用程序栈的支持,随时可以部署在多核和云计算架构。

  • 响应式编程与命令式编程的区别:
    在命令式编程中,a:=b+c意味着将b+c的结果赋值给a,并且此后b或c的值发生变化不会影响到a的值。而在响应式编程中,a的值会随着b或c的改变而自动更新,并且不需要重新执行a:=b+c来确定当前分配给a的值。(PS:很像angularjs、vuejs这种MVVM框架,视图绑定模型,模型变了,视图自动就跟着变了)

2. Reactive 的主要接口

  • Publisher: 发布者,数据的生产端。由它来提供数据的发生
  • Subscriber:消费者,此处可以定义获取到数据后响应的操作
  • Processor:消费者与发布者之间的数据处理
  • back pressire:背压,消费者告诉发布者自己能够处理多少数据

模拟一个例子

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class FlowDemo {

    public static void main(String[] args) throws InterruptedException {
        //1.定义发布者,数据类型是Integer
        SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

        //2.定义消费者
        Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<Integer>() {

            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                //订阅关系管理
                this.subscription = subscription;
                this.subscription.request(1);
            }

            @Override
            public void onNext(Integer integer) {
                //获取到数据后,开始处理
                System.out.println("我接收到的数据是:" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                throwable.printStackTrace();
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                System.out.println("数据处理完毕");

            }
        };

        // 发布者和订阅者建立联系
        publisher.subscribe(subscriber);

        //创建数据
        //TODO -- 数据库,redis,缓存 省略掉数据获取的步骤

        int data = 110;
        publisher.submit(data);
        publisher.close();

        Thread.currentThread().join(1000);
    }
}

执行结果

我接收到的数据是:110
数据处理完毕

3. 消费者的回调方法

  • onSubscribe:订阅关系处理,用它来响应发布者
  • onNext:接收到数据后会响应的方法
  • onError:出现任何错误时处理的方法
  • onComplete:任务完成后响应的方法

4. Flow类的代码

package java.util.concurrent;

public final class Flow {
    static final int DEFAULT_BUFFER_SIZE = 256;

    private Flow() {
    }

    public static int defaultBufferSize() {
        return 256;
    }

    public interface Processor<T, R> extends Flow.Subscriber<T>, Flow.Publisher<R> {
    }

    public interface Subscription {
        void request(long var1);

        void cancel();
    }

    public interface Subscriber<T> {
        void onSubscribe(Flow.Subscription var1);

        void onNext(T var1);

        void onError(Throwable var1);

        void onComplete();
    }

    @FunctionalInterface
    public interface Publisher<T> {
        void subscribe(Flow.Subscriber<? super T> var1);
    }
}

可以看到上边也定义了一个Professor接口,所以按理来说我们是可以使用一个类来实现这个接口,然后处理相关的逻辑的,以下就用这种方式来实现类似上边的一个例子。

  • MyProfessor

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<Integer, String> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1);
    }

    @Override
    public void onNext(Integer integer) {
        System.out.println("打印当前的数据:" + integer);
        this.submit(String.valueOf(integer + 100));
    }

    @Override
    public void onError(Throwable throwable) {
        throwable.printStackTrace();
        this.subscription.cancel();
    }

    @Override
    public void onComplete() {
        System.out.println("调用完成");
    }
}
  • MyFlowDemo
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class MyFlowDemo {

    public static void main(String[] args) throws InterruptedException {
        SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

        MyProcessor myProcessor = new MyProcessor();

        publisher.subscribe(myProcessor);

        //定义消费者 --
        Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {

            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                this.subscription = subscription;
                subscription.request(1);
            }

            @Override
            public void onNext(String s) {
                System.out.println("接收到数据:" + s);
                throw new RuntimeException();
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("人为制造异常,执行了onerror!!!");
                throwable.printStackTrace();
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                System.out.println("发送方执行完毕");
            }
        };

        myProcessor.subscribe(subscriber);

        publisher.submit(111);
        publisher.close();

        Thread.currentThread().join(1000);

    }
}

如果觉得有收获就点个赞吧,更多知识,请点击关注查看我的主页信息哦~

有关Reactive编程思想的更多相关文章

  1. ruby - 寻找通过阅读代码确定编程语言的ruby gem? - 2

    几个月前,我读了一篇关于ruby​​gem的博客文章,它可以通过阅读代码本身来确定编程语言。对于我的生活,我不记得博客或gem的名称。谷歌搜索“ruby编程语言猜测”及其变体也无济于事。有人碰巧知道相关gem的名称吗? 最佳答案 是这个吗:http://github.com/chrislo/sourceclassifier/tree/master 关于ruby-寻找通过阅读代码确定编程语言的rubygem?,我们在StackOverflow上找到一个类似的问题:

  2. 网络编程套接字 - 2

    网络编程套接字网络编程基础知识理解源`IP`地址和目的`IP`地址理解源MAC地址和目的MAC地址认识端口号理解端口号和进程ID理解源端口号和目的端口号认识`TCP`协议认识`UDP`协议网络字节序socket编程接口`sockaddr``UDP`网络程序服务器端代码逻辑:需要用到的接口服务器端代码`udp`客户端代码逻辑`udp`客户端代码`TCP`网络程序服务器代码逻辑多个版本服务器单进程版本多进程版本多线程版本线程池版本服务器端代码客户端代码逻辑客户端代码TCP协议通讯流程TCP协议的客户端/服务器程序流程三次握手(建立连接)数据传输四次挥手(断开连接)TCP和UDP对比网络编程基础知识

  3. ruby - 我正在学习编程并选择了 Ruby。我应该升级到 Ruby 1.9 吗? - 2

    我完全不是程序员,正在学习使用Ruby和Rails框架进行编程。我目前正在使用Ruby1.8.7和Rails3.0.3,但我想知道我是否应该升级到Ruby1.9,因为我真的没有任何升级的“遗留”成本。缺点是什么?我是否会遇到与普通gem的兼容性问题,或者甚至其他我不太了解甚至无法预料的问题? 最佳答案 你应该升级。不要坚持从1.8.7开始。如果您发现不支持1.9.2的gem,请避免使用它们(因为它们很可能不被维护)。如果您对gem是否兼容1.9.2有任何疑问,您可以在以下位置查看:http://www.railsplugins.or

  4. ruby - 如何以编程方式删除实例上的 "singleton information"以使其编码(marshal)? - 2

    我创建了一个由于“在运行时执行的单例元类定义”而无法编码的对象(这段代码的描述是否正确?)。这是通过以下代码执行的:#defineclassXthatmyusesingletonclassmetaprogrammingfeatures#throughcallofmethod:break_marshalling!classXdefbreak_marshalling!meta_class=class我该怎么做才能使对象编码正确?是否可以从对象instance_of_x的classX中“移除”单例组件?我真的需要一个建议,因为我们的一些对象需要通过Marshal.dump序列化机制进行缓存。

  5. Ruby 元编程问题 - 2

    我正在查看Ruby日志记录库Logging.logger方法并从sourceatgithub提出问题与这段代码有关:logger=::Logging::Logger.new(name)logger.add_appendersappenderlogger.additive=falseclass我知道类 最佳答案 这实际上删除了方法(当它实际被执行时)。这是确保close不会被调用两次的保障措施。看起来好像有嵌套的“class 关于Ruby元编程问题,我们在StackOverflow上找到一

  6. ruby - Paperclip:以编程方式分配图像并设置其名称 - 2

    使用Paperclip,我想从这样的URL抓取图像:require'open-uri'user.photo=open(url)问题是我最后得到一个像“open-uri20110915-4852-1o7k5uw”这样的文件名。有什么方法可以更改user.photo上的文件名?作为一个额外的变化,Paperclip将我的文件存储在S3上,所以如果我可以在初始分配中设置我想要的文件名就更好了,这样图像就会上传到正确的S3key。像这样:user.photo=open(url),:filename=>URI.parse(url).path 最佳答案

  7. ruby - 如何以编程方式检查证书是否已被吊销? - 2

    我正在开发一个xcode自动构建系统。在执行一些预构建验证时,我想检查指定的证书文件是否已被撤销。我了解securityverify-cert验证其他证书属性但不验证吊销。我如何检查撤销?我正在用Ruby编写构建系统,但我对任何语言的想法都持开放态度。我阅读了这个答案(Openssl-Howtocheckifacertificateisrevokedornot),但指向底部的链接(DoesOpenSSLautomaticallyhandleCRLs(CertificateRevocationLists)now?)进入的Material对我的目的来说有点过于复杂(用户上传已撤销的证书是一

  8. ruby - 如何保持我不常用的编程语言技能 - 2

    关闭。这个问题是off-topic.它目前不接受答案。想改进这个问题吗?Updatethequestion所以它是on-topic用于堆栈溢出。关闭11年前。Improvethisquestion我不经常使用ruby​​-通常它加起来相当于每两个月或更长时间编写一次脚本。我的大部分编程都是使用C++进行的,这与ruby​​有很大不同。由于我与ruby​​之间的差距如此之大,我总是忘记语言的基本方面(比如解析文本文件和其他简单的东西)。我想每天练习一些基本的东西,我想知道是否有一些我可以订阅的网站,并且会向我发送当天的Ruby问题或类似的东西。有人知道这样的站点/Internet服务吗?

  9. ruby - 如何以编程方式将 mp3 转换为 itunes 可播放的 aac/m4a 文件? - 2

    我一直在寻找一种以编程方式或通过命令行将mp3转换为aac的方法,但没有成功。理想情况下,我有一段代码可以从我的Rails应用程序中调用,将mp3转换为aac。我安装了ffmpeg和libfaac,并能够使用以下命令创建aac文件:ffmpeg-itest.mp3-acodeclibfaac-ab163840dest.aac当我将输出文件的名称更改为dest.m4a时,它无法在iTunes中播放。谢谢! 最佳答案 FFmpeg提供AAC编码功能(如果您已编译它们)。如果您使用的是Windows,则可以从here获取完整的二进制文件。

  10. ruby - 以编程方式从字符串派生正则表达式 - 2

    我想输入一个字符串并返回一个可用于描述字符串结构的正则表达式。正则表达式将用于查找更多与第一个结构相同的字符串。这是故意模棱两可的,因为我肯定会漏掉SO社区中的某个人会发现的情况。请发布任何和所有可能的方法来做到这一点。 最佳答案 简单的答案(可能不是您想要的)是:返回输入字符串(正则表达式特殊字符转义)。这始终是与字符串匹配的正则表达式。如果您希望识别某些结构,则必须提供有关您希望识别的结构类型的更多信息。如果没有这些信息,问题就会以模棱两可的方式陈述,并且有许多可能的解决方案。例如,输入字符串'aba'可以描述为'阿巴''阿巴*

随机推荐