实现即时消息的方法有很多种比如websocket,sse; 而sse 又有spring mvc 实现的也有webflux 实现的。mvc实现的网上已经有很多了,而webflux 实现的不是很多,也不是很全,因此本文主要做的是webflux 实现的即时消息,sse 这里不多讲,如果有不理解的可以自行百度,谷歌。
maven 依赖在最下面
下面是最简单的实现也是应用场景最少的实现
@GetMapping(path = "/sse/{userId}",produces = MediaType.TEXT_EVENT_STREAM_VALUE )
public Flux<ServerSentEvent<String>> sse(@PathVariable String userId) {
// 每两秒推送一次
return Flux.interval(Duration.ofSeconds(2)).map(seq->
Tuples.of(seq, LocalDateTime.now())).log()
.map(data-> ServerSentEvent.<String>builder().id("1").data(data.getT2().toString()).build());
}
上面的适合股票之类的,周期性的消息。比如每两秒发送一次消息;这样的场景是合适的,但是如果是非周期性的消息呢?比如我需要再应用里发一个公告,这个公告是突然的,不确定的,那么这个逻辑就不合适了。
下面介绍非周期性消息
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.http.MediaType;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
/**
* @author haoran
*/
@RestController
@RequestMapping("/sse")
public class MessageController implements ApplicationListener {
private final SubscribableChannel subscribableChannel = MessageChannels.publishSubscribe().get();
@GetMapping(value = "/message",produces = MediaType.TEXT_EVENT_STREAM_VALUE )
public Flux<String> getMessage(){
return Flux.create(stringFluxSink -> {
MessageHandler messageHandler = message -> stringFluxSink.next(String.class.cast(message.getPayload()));
// 用户断开的时候取消订阅
stringFluxSink.onCancel(()->subscribableChannel.unsubscribe(messageHandler));
// 订阅消息
subscribableChannel.subscribe(messageHandler);
}, FluxSink.OverflowStrategy.LATEST);
}
@Override
public void onApplicationEvent(ApplicationEvent event) {
subscribableChannel.send(new GenericMessage<>(event.getSource()));
}
@PostMapping("/publish")
public void publish(@RequestParam String message){
subscribableChannel.send(new GenericMessage<>(message));
}
}
这里有个局限性 就是单服务的消息,那如果是多服务的集群消息怎么解决呢?
下面代码是使用redis 的发布订阅模式来实现webflux 的sse 集群
import indi.houhaoran.webflux.domian.MessageDTO;
import lombok.RequiredArgsConstructor;
import org.redisson.api.RedissonClient;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
/**
* @author haoran
*/
@RestController
@RequestMapping("/flux")
@RequiredArgsConstructor
public class FluxMessageController {
private final RedissonClient redissonClient;
public static final String USER_TOPIC = "user:";
public static final String BROADCAST_TOPIC = "broadcast_topic";
@GetMapping(path = "/connect/{userId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<MessageDTO> getFolderWatch(@PathVariable String userId) {
return Flux.create(sink -> {
// 订阅 广播
redissonClient.getTopic(BROADCAST_TOPIC).addListener(MessageDTO.class, (c, m) -> {
sink.next(m);
});
// 监听 用户主题 单个
redissonClient.getTopic(USER_TOPIC + userId).addListener(MessageDTO.class, (c, m) -> {
sink.next(m);
});
//加入监听如果断开链接就移除redis 的订阅
sink.onCancel(() -> {
// 断开移除
System.out.println("退出 userId:" + userId);
redissonClient.getTopic(USER_TOPIC + userId).removeAllListeners();
redissonClient.getTopic(BROADCAST_TOPIC).removeListener((Integer) redissonClient.getMap(BROADCAST_TOPIC).get(userId));
});
}, FluxSink.OverflowStrategy.LATEST);
}
@PostMapping("/publish")
public void publish(@RequestBody MessageDTO messageDTO) {
redissonClient.getTopic(BROADCAST_TOPIC).publish(messageDTO);
}
}
redisson 配置
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(connectionFactory);
redisTemplate.setConnectionFactory(connectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
// 这个地方不可使用 json 序列化,否则会有问题,会出现一个 java.lang.IllegalArgumentException: Value must not be null! 错误
redisTemplate.setHashValueSerializer(new StringRedisSerializer());
return redisTemplate;
}
}
@Slf4j
@Configuration
public class RedissonConfigure {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
SingleServerConfig singleServerConfig = config.useSingleServer();
singleServerConfig.setAddress("redis://localhost:6379");
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
objectMapper.registerModule(new JavaTimeModule());
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
config.setCodec(new JsonJacksonCodec(objectMapper));
return Redisson.create(config);
}
}
其他类
import java.io.Serializable;
/**
* @author haoran
*/
@Data
public class MessageDTO implements Serializable {
private String message;
}
调试:



由此可见当我从8080 服务发送消息,8080,8081两个服务都接收到消息了
maven 依赖
<parent>
<artifactId>webfluxdemo</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>server</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>fst</artifactId>
<version>2.57</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.17.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
父pom
<groupId>org.example</groupId>
<artifactId>webfluxdemo</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.4</version>
</parent>
<modules>
<module>client</module>
<module>server</module>
<module>RxJava</module>
</modules>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
</dependencyManagement>
<!-- ... -->
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<url>https://repo.spring.io/snapshot</url>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<url>https://repo.spring.io/milestone</url>
</pluginRepository>
</pluginRepositories>
参考·1 Reactor 3 参考文档 (htmlpreview.github.io)
参考·2 https://www.lefer.cn/posts/30624/
结语:百度真垃圾,查了半天也没找到,终归要google;本文只是简单的实现了sse 在真实场景下会有很多不足,比如redis 加入订阅的是通过lamda 表达式实现的,这里最好有个实现类来实现订阅发送消息的业务。
题外话:webflux 如何实现响应式报表?
我正在尝试使用ruby和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我
我想安装一个带有一些身份验证的私有(private)Rubygem服务器。我希望能够使用公共(public)Ubuntu服务器托管内部gem。我读到了http://docs.rubygems.org/read/chapter/18.但是那个没有身份验证-如我所见。然后我读到了https://github.com/cwninja/geminabox.但是当我使用基本身份验证(他们在他们的Wiki中有)时,它会提示从我的服务器获取源。所以。如何制作带有身份验证的私有(private)Rubygem服务器?这是不可能的吗?谢谢。编辑:Geminabox问题。我尝试“捆绑”以安装新的gem..
最近,当我启动我的Rails服务器时,我收到了一长串警告。虽然它不影响我的应用程序,但我想知道如何解决这些警告。我的估计是imagemagick以某种方式被调用了两次?当我在警告前后检查我的git日志时。我想知道如何解决这个问题。-bcrypt-ruby(3.1.2)-better_errors(1.0.1)+bcrypt(3.1.7)+bcrypt-ruby(3.1.5)-bcrypt(>=3.1.3)+better_errors(1.1.0)bcrypt和imagemagick有关系吗?/Users/rbchris/.rbenv/versions/2.0.0-p247/lib/ru
在Rails4.0.2中,我使用s3_direct_upload和aws-sdkgems直接为s3存储桶上传文件。在开发环境中它工作正常,但在生产环境中它会抛出如下错误,ActionView::Template::Error(noimplicitconversionofnilintoString)在View中,create_cv_url,:id=>"s3_uploader",:key=>"cv_uploads/{unique_id}/${filename}",:key_starts_with=>"cv_uploads/",:callback_param=>"cv[direct_uplo
我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden
我是rails的新手,想在form字段上应用验证。myviewsnew.html.erb.....模拟.rbclassSimulation{:in=>1..25,:message=>'Therowmustbebetween1and25'}end模拟Controller.rbclassSimulationsController我想检查模型类中row字段的整数范围,如果不在范围内则返回错误信息。我可以检查上面代码的范围,但无法返回错误消息提前致谢 最佳答案 关键是您使用的是模型表单,一种显示ActiveRecord模型实例属性的表单。c
我想在Ruby中创建一个用于开发目的的极其简单的Web服务器(不,不想使用现成的解决方案)。代码如下:#!/usr/bin/rubyrequire'socket'server=TCPServer.new('127.0.0.1',8080)whileconnection=server.acceptheaders=[]length=0whileline=connection.getsheaders想法是从命令行运行这个脚本,提供另一个脚本,它将在其标准输入上获取请求,并在其标准输出上返回完整的响应。到目前为止一切顺利,但事实证明这真的很脆弱,因为它在第二个请求上中断并出现错误:/usr/b
您如何在Rails中的实时服务器上进行有效调试,无论是在测试版/生产服务器上?我试过直接在服务器上修改文件,然后重启应用,但是修改好像没有生效,或者需要很长时间(缓存?)我也试过在本地做“脚本/服务器生产”,但是那很慢另一种选择是编码和部署,但效率很低。有人对他们如何有效地做到这一点有任何见解吗? 最佳答案 我会回答你的问题,即使我不同意这种热修补服务器代码的方式:)首先,你真的确定你已经重启了服务器吗?您可以通过跟踪日志文件来检查它。您更改的代码显示的View可能会被缓存。缓存页面位于tmp/cache文件夹下。您可以尝试手动删除
我的工作要求我为某些测试自动生成电子邮件。我一直在四处寻找,但未能找到可以快速实现的合理解决方案。它需要在outlook而不是其他邮件服务器中,因为我们有一些奇怪的身份验证规则,我们需要保存草稿而不是仅仅发送邮件的选项。显然win32ole可以做到这一点,但我找不到任何相当简单的例子。 最佳答案 假设存储了Outlook凭据并且您设置为自动登录到Outlook,WIN32OLE可以很好地完成此操作:require'win32ole'outlook=WIN32OLE.new('Outlook.Application')message=
我正在使用Ruby,我正在与一个网络端点通信,该端点在发送消息本身之前需要格式化“header”。header中的第一个字段必须是消息长度,它被定义为网络字节顺序中的2二进制字节消息长度。比如我的消息长度是1024。如何将1024表示为二进制双字节? 最佳答案 Ruby(以及Perl和Python等)中字节整理的标准工具是pack和unpack。ruby的packisinArray.您的长度应该是两个字节长,并且按网络字节顺序排列,这听起来像是n格式说明符的工作:n|Integer|16-bitunsigned,network(bi