草庐IT

webflux 实现服务端推送消息

simians 2023-03-28 原文

实现即时消息的方法有很多种比如websocket,sse; 而sse 又有spring mvc 实现的也有webflux 实现的。mvc实现的网上已经有很多了,而webflux 实现的不是很多,也不是很全,因此本文主要做的是webflux 实现的即时消息,sse 这里不多讲,如果有不理解的可以自行百度,谷歌。
maven 依赖在最下面
下面是最简单的实现也是应用场景最少的实现

    @GetMapping(path = "/sse/{userId}",produces = MediaType.TEXT_EVENT_STREAM_VALUE )
    public Flux<ServerSentEvent<String>> sse(@PathVariable String userId) {
        // 每两秒推送一次
        return Flux.interval(Duration.ofSeconds(2)).map(seq->
            Tuples.of(seq, LocalDateTime.now())).log()
                .map(data-> ServerSentEvent.<String>builder().id("1").data(data.getT2().toString()).build());
    }

上面的适合股票之类的,周期性的消息。比如每两秒发送一次消息;这样的场景是合适的,但是如果是非周期性的消息呢?比如我需要再应用里发一个公告,这个公告是突然的,不确定的,那么这个逻辑就不合适了。
下面介绍非周期性消息


import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.http.MediaType;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

/**
 * @author haoran
 */
@RestController
@RequestMapping("/sse")
public class MessageController implements ApplicationListener {
    private final SubscribableChannel subscribableChannel = MessageChannels.publishSubscribe().get();

    @GetMapping(value = "/message",produces = MediaType.TEXT_EVENT_STREAM_VALUE )
    public Flux<String> getMessage(){
        return Flux.create(stringFluxSink -> {
            MessageHandler messageHandler = message -> stringFluxSink.next(String.class.cast(message.getPayload()));
            // 用户断开的时候取消订阅
            stringFluxSink.onCancel(()->subscribableChannel.unsubscribe(messageHandler));
            // 订阅消息
            subscribableChannel.subscribe(messageHandler);
        }, FluxSink.OverflowStrategy.LATEST);
    }


    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        subscribableChannel.send(new GenericMessage<>(event.getSource()));
    }
    @PostMapping("/publish")
    public void publish(@RequestParam String message){
        subscribableChannel.send(new GenericMessage<>(message));

    }
}

这里有个局限性 就是单服务的消息,那如果是多服务的集群消息怎么解决呢?
下面代码是使用redis 的发布订阅模式来实现webflux 的sse 集群

import indi.houhaoran.webflux.domian.MessageDTO;
import lombok.RequiredArgsConstructor;
import org.redisson.api.RedissonClient;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

/**
 * @author haoran
 */
@RestController
@RequestMapping("/flux")
@RequiredArgsConstructor
public class FluxMessageController {
    private final RedissonClient redissonClient;
    public static final String USER_TOPIC = "user:";
    public static final String BROADCAST_TOPIC = "broadcast_topic";

    @GetMapping(path = "/connect/{userId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<MessageDTO> getFolderWatch(@PathVariable String userId) {
        return Flux.create(sink -> {
            // 订阅 广播
            redissonClient.getTopic(BROADCAST_TOPIC).addListener(MessageDTO.class, (c, m) -> {
                sink.next(m);
            });
            // 监听 用户主题 单个
            redissonClient.getTopic(USER_TOPIC + userId).addListener(MessageDTO.class, (c, m) -> {
                sink.next(m);
            });
            //加入监听如果断开链接就移除redis 的订阅
            sink.onCancel(() -> {
                // 断开移除
                System.out.println("退出 userId:" + userId);
                redissonClient.getTopic(USER_TOPIC + userId).removeAllListeners();
                redissonClient.getTopic(BROADCAST_TOPIC).removeListener((Integer) redissonClient.getMap(BROADCAST_TOPIC).get(userId));
            });
        }, FluxSink.OverflowStrategy.LATEST);
    }

    @PostMapping("/publish")
    public void publish(@RequestBody MessageDTO messageDTO) {
        redissonClient.getTopic(BROADCAST_TOPIC).publish(messageDTO);
    }
}

redisson 配置

@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(connectionFactory);
        redisTemplate.setConnectionFactory(connectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        // 这个地方不可使用 json 序列化,否则会有问题,会出现一个 java.lang.IllegalArgumentException: Value must not be null! 错误
        redisTemplate.setHashValueSerializer(new StringRedisSerializer());
        return redisTemplate;
    }
}

@Slf4j
@Configuration
public class RedissonConfigure {

    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        SingleServerConfig singleServerConfig = config.useSingleServer();
        singleServerConfig.setAddress("redis://localhost:6379");
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        objectMapper.registerModule(new JavaTimeModule());
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        config.setCodec(new JsonJacksonCodec(objectMapper));
        return Redisson.create(config);
    }
}

其他类

import java.io.Serializable;

/**
 * @author haoran
 */
@Data
public class MessageDTO implements Serializable {
    private String message;
}

调试:


postman
服务8080
服务8081

由此可见当我从8080 服务发送消息,8080,8081两个服务都接收到消息了

maven 依赖

    <parent>
        <artifactId>webfluxdemo</artifactId>
        <groupId>org.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>server</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
        <dependency>
            <groupId>de.ruedigermoeller</groupId>
            <artifactId>fst</artifactId>
            <version>2.57</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.17.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

父pom

    <groupId>org.example</groupId>
    <artifactId>webfluxdemo</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.4</version>
    </parent>
    <modules>
        <module>client</module>
        <module>server</module>
        <module>RxJava</module>
    </modules>
    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
    </dependencies>
    <dependencyManagement>

    </dependencyManagement>
    <!-- ... -->
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
    <pluginRepositories>
        <pluginRepository>
            <id>spring-snapshots</id>
            <url>https://repo.spring.io/snapshot</url>
        </pluginRepository>
        <pluginRepository>
            <id>spring-milestones</id>
            <url>https://repo.spring.io/milestone</url>
        </pluginRepository>
    </pluginRepositories>

参考·1 Reactor 3 参考文档 (htmlpreview.github.io)
参考·2 https://www.lefer.cn/posts/30624/
结语:百度真垃圾,查了半天也没找到,终归要google;本文只是简单的实现了sse 在真实场景下会有很多不足,比如redis 加入订阅的是通过lamda 表达式实现的,这里最好有个实现类来实现订阅发送消息的业务。
题外话:webflux 如何实现响应式报表?

有关webflux 实现服务端推送消息的更多相关文章

  1. ruby - 使用 ruby​​ 和 savon 的 SOAP 服务 - 2

    我正在尝试使用ruby​​和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我

  2. ruby - 具有身份验证的私有(private) Ruby Gem 服务器 - 2

    我想安装一个带有一些身份验证的私有(private)Rubygem服务器。我希望能够使用公共(public)Ubuntu服务器托管内部gem。我读到了http://docs.rubygems.org/read/chapter/18.但是那个没有身份验证-如我所见。然后我读到了https://github.com/cwninja/geminabox.但是当我使用基本身份验证(他们在他们的Wiki中有)时,它会提示从我的服务器获取源。所以。如何制作带有身份验证的私有(private)Rubygem服务器?这是不可能的吗?谢谢。编辑:Geminabox问题。我尝试“捆绑”以安装新的gem..

  3. ruby-on-rails - 启动 Rails 服务器时 ImageMagick 的警告 - 2

    最近,当我启动我的Rails服务器时,我收到了一长串警告。虽然它不影响我的应用程序,但我想知道如何解决这些警告。我的估计是imagemagick以某种方式被调用了两次?当我在警告前后检查我的git日志时。我想知道如何解决这个问题。-bcrypt-ruby(3.1.2)-better_errors(1.0.1)+bcrypt(3.1.7)+bcrypt-ruby(3.1.5)-bcrypt(>=3.1.3)+better_errors(1.1.0)bcrypt和imagemagick有关系吗?/Users/rbchris/.rbenv/versions/2.0.0-p247/lib/ru

  4. ruby-on-rails - s3_direct_upload 在生产服务器中不工作 - 2

    在Rails4.0.2中,我使用s3_direct_upload和aws-sdkgems直接为s3存储桶上传文件。在开发环境中它工作正常,但在生产环境中它会抛出如下错误,ActionView::Template::Error(noimplicitconversionofnilintoString)在View中,create_cv_url,:id=>"s3_uploader",:key=>"cv_uploads/{unique_id}/${filename}",:key_starts_with=>"cv_uploads/",:callback_param=>"cv[direct_uplo

  5. ruby - 如何根据特征实现 FactoryGirl 的条件行为 - 2

    我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden

  6. ruby-on-rails - 如何在 Rails View 上显示错误消息? - 2

    我是rails的新手,想在form字段上应用验证。myviewsnew.html.erb.....模拟.rbclassSimulation{:in=>1..25,:message=>'Therowmustbebetween1and25'}end模拟Controller.rbclassSimulationsController我想检查模型类中row字段的整数范围,如果不在范围内则返回错误信息。我可以检查上面代码的范围,但无法返回错误消息提前致谢 最佳答案 关键是您使用的是模型表单,一种显示ActiveRecord模型实例属性的表单。c

  7. ruby - 用 Ruby 编写一个简单的网络服务器 - 2

    我想在Ruby中创建一个用于开发目的的极其简单的Web服务器(不,不想使用现成的解决方案)。代码如下:#!/usr/bin/rubyrequire'socket'server=TCPServer.new('127.0.0.1',8080)whileconnection=server.acceptheaders=[]length=0whileline=connection.getsheaders想法是从命令行运行这个脚本,提供另一个脚本,它将在其标准输入上获取请求,并在其标准输出上返回完整的响应。到目前为止一切顺利,但事实证明这真的很脆弱,因为它在第二个请求上中断并出现错误:/usr/b

  8. ruby-on-rails - 在 Rails 中调试生产服务器 - 2

    您如何在Rails中的实时服务器上进行有效调试,无论是在测试版/生产服务器上?我试过直接在服务器上修改文件,然后重启应用,但是修改好像没有生效,或者需要很长时间(缓存?)我也试过在本地做“脚本/服务器生产”,但是那很慢另一种选择是编码和部署,但效率很低。有人对他们如何有效地做到这一点有任何见解吗? 最佳答案 我会回答你的问题,即使我不同意这种热修补服务器代码的方式:)首先,你真的确定你已经重启了服务器吗?您可以通过跟踪日志文件来检查它。您更改的代码显示的View可能会被缓存。缓存页面位于tmp/cache文件夹下。您可以尝试手动删除

  9. ruby - 使用 Ruby 通过 Outlook 发送消息的最简单方法是什么? - 2

    我的工作要求我为某些测试自动生成电子邮件。我一直在四处寻找,但未能找到可以快速实现的合理解决方案。它需要在outlook而不是其他邮件服务器中,因为我们有一些奇怪的身份验证规则,我们需要保存草稿而不是仅仅发送邮件的选项。显然win32ole可以做到这一点,但我找不到任何相当简单的例子。 最佳答案 假设存储了Outlook凭据并且您设置为自动登录到Outlook,WIN32OLE可以很好地完成此操作:require'win32ole'outlook=WIN32OLE.new('Outlook.Application')message=

  10. Ruby - 如何将消息长度表示为 2 个二进制字节 - 2

    我正在使用Ruby,我正在与一个网络端点通信,该端点在发送消息本身之前需要格式化“header”。header中的第一个字段必须是消息长度,它被定义为网络字节顺序中的2二进制字节消息长度。比如我的消息长度是1024。如何将1024表示为二进制双字节? 最佳答案 Ruby(以及Perl和Python等)中字节整理的标准工具是pack和unpack。ruby的packisinArray.您的长度应该是两个字节长,并且按网络字节顺序排列,这听起来像是n格式说明符的工作:n|Integer|16-bitunsigned,network(bi

随机推荐