草庐IT

Spring Cloud 微服务架构下的 WebSocket 解决方案

普通网友 2023-07-01 原文

WebSocket在现代浏览器中的应用已经算是比较普遍了,在某些业务场景下,要求必须能够在服务器端推送消息至客户端。在没有WebSocket的年代,我们使用过dwr,在那个时候dwr真实一个非常棒的方案。但是在WebSocket兴起之后,我们更愿意使用标准实现来解决问题、

首先交代一下,本篇文章不讲解WebSocket的配置,主要讲的是针对在微服务架构集群模式下解决方案的选择。

微服务架构大家应该都不陌生了,在微服务架构下,服务是分布式的,而且为了保证业务的可用性,每个服务都是以集群的形式存在。在集群模式下,要保证集群的每一个节点的访问得到相同的结果就需要做到数据一致性,如缓存、session等。

微服务集群缓存通常使用分布式缓存redis解决,session一致性也通常会通过redis解决,但是现在更流行的是无状态的Http,即无session化,最常见的解决方案就是OAuth。

WebSocket有所不同,它是与服务端建立一个长连接,在集群模式下,显然不可能把前端与服务集群中的每一个节点建立连接,一个可行的思路是像解决http session的共享一样,通过redis来实现websocket的session共享,但是websocket session的数量是远多于http session的数量的(因为每打开一个页面都会建立一个websocket连接),所以随着用户量的增长,共享的数据量太大,很容易造成瓶颈。

另一个思路是,websocket总归会与集群中某个节点建立连接,那么,只要找到连接所在的节点,就可以向服务端推送消息了,那么要解决的问题就是如何找到一个websocket连接所在的节点。要找到连接在哪个节点上,我们需要一个唯一的标识符用于寻找连接,然而在基于stomp的发布-订阅模式下,一个消息的推送可能是面向若干个连接的,可能分布在集群中的每一个节点上,这样去寻找连接的代价也很高。既然这样,我们不妨换种思路,每一个websocket消息,我们在集群的每个节点上都进行推送,订阅了该消息的连接,不管有一个还是一万个,最终肯定都能收到这个消息。基于这个思路,我们做了一些技术选型:

  • RabbitMQ
  • Spring Cloud Stream

首先说RabbitMQ,高级消息队列,可以实现消息广播(当然kafka一样可以做到,这里只介绍一种),另一项技术是Spring Cloud Stream,stream是一个用于构建高度可扩展事件驱动型微服务的框架,并且它可以跟RabbitMQ、Kafka以及其他多种消息服务集成,使用了stream,要把rabbitmq换成kafka只不过是改改配置的事情。接下来重点介绍使用方法:

引入依赖

<dependency>
? ? <groupId>org.springframework.cloud</groupId>
? ? <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
? ? <groupId>org.springframework.cloud</groupId>
? ? <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

配置Binder

binder是stream中的重要概念,是用于配置用于stream发布和订阅事件的消息中间件。先看一段配置:

spring:
? cloud:
? ? stream:
? ? ? binders:
? ? ? ? defaultRabbit:
? ? ? ? ? type: rabbit
? ? ? ? ? environment:
? ? ? ? ? ? spring:
? ? ? ? ? ? ? rabbitmq:
? ? ? ? ? ? ? ? host: localhost
? ? ? ? ? ? ? ? username: username
? ? ? ? ? ? ? ? password: password
? ? ? ? ? ? ? ? virtual-host: /

配置中的 defaultRabbit 是binder的名称,一会会在其他配置中引用,type指定了消息中间件的类型,environment是对消息中间件的配置,这里的配置结构和spring.rabbitmq命名空间下的配置项一模一样的,可以参照着进行配置(这样配置的作用是可以把stream的rabbitmq配置和项目中其他地方使用的rabbitmq区分开,如果这里不配置environment,binder会沿用spring.rabbitmq命名空间下的配置),比如你的项目中的rabbitmq的配置是这样的:

spring:
? rabbitmq:
? ? host: localhost
? ? username: username
? ? password: password
? ? virtual-host: /

那上门的binder的environment配置完全可以去掉。

消息流与binder的绑定

微服务要接收挥着发布事件消息,根据spring cloud stream的名字,顾名思义,需要使用流,所以需要在配置中声明两个事件流,一个输入流,一个输出流:

spring:
? cloud:
? ? stream:
? ? ? bindings:
? ? ? ? websocketMessageIn:
? ? ? ? ? destination: websocketMessage
? ? ? ? ? binder: defaultRabbit
? ? ? ? websocketMessageOut:
? ? ? ? ? destination: websocketMessage
? ? ? ? ? binder: defaultRabbit

这里我们看到,事件流引用了binder,表示这两个流使用rabbitmq这个中间件(看到这里想必大家已经明白了,在一个项目中完全可以同时使用rabbit和kafka作为事件流的消息中间件)。

websocketMessageIn,websocketMessageOut是事件流的名字(可以自己随便起),destination指定了两个事件流的destination是同一个,这决定了写入和读取是指向同一个地方(不一定是同一个消息队列)。

事件流声明

事件流使用接口进行定义:

/**
?* websocket消息事件流接口
?* Created by 吴昊 on 18-11-8.
?*
?* @author 吴昊
?* @since 1.4.3
?*/
interface WebSocketMessageStream {
? companion object {
? ? const val INPUT: String = "webSocketMessageIn"
? ? const val OUTPUT: String = "webSocketMessageOut"
? }

? /**
? ?* 输入
? ?*/
? @Input(INPUT)
? fun input(): SubscribableChannel

? /**
? ?* 输出
? ?*/
? @Output(OUTPUT)
? fun output(): MessageChannel
}

声明事件流接口,这里面定义了两个常量,分别对应配置中的两个流名称,通过调用input()方法获取输入流,通过调用output()获取输出流。

该接口的实现由spring cloud stream完成,不需要自己实现。

使用事件流

声明一个bean:

@Component
@EnableBinding(WebSocketMessageStream::class)
class WebSocketMessageService {
……

这里的@EnableBinding 注解指明了事件流接口类,只有添加了这个注解(要能被Spring识别到,可以加在入口类上,也可以加在@Configuration注解的类上),该接口才会被实现,并且加入到Spring的容器中(可以注入)。

上面WebSocketMessageService的内容如下:

@Autowired
? private lateinit var stream: WebSocketMessageStream
? @Autowired
? private lateinit var template: SimpMessagingTemplate

? @StreamListener(WebSocketMessageStream.INPUT)
? fun messageReceived(message: WebSocketMessage) {
? ? template.convertAndSend(message.destination, message.body)
? }

? fun send(destination: String, body: Any) {
? ? stream.output().send(
? ? ? ? MutableMessage(WebSocketMessage(destination, body))
? ? )
? }

接收消息

@StreamListener 注解指明了要监听的事件流,方法接收的参数即事件的消息内容(使用jackson反序列化),这里的messageReceived方法直接将接收到的消息直接用websocket发送给前端

发送消息

同样,发送也很简单,将消息直接发送到输入流中,上面的send方法即是将原本应该用SimpMessagingTemplate发送给websocket的消息发送到spring cloud stream的事件流中。这样做以后,项目中所有需要向前端推送webSocket消息的操作都应该调用send方法来进行。

讲到这里大家可能还有点糊涂,也有一些疑问,为什么这样每个微服务节点就能收到事件消息了?或者单个节点接收事件消息和多个节点接收的配置是怎么控制的。各位不要着急,待我慢慢道来,接下来就要结合rabbit的知识来讲解
了:

首先看一下rabbit的消息队列:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aqcmKJN1-1661299441476)(https://user-gold-cdn.xitu.io/2018/12/2/1676ec65c54fc3de?imageView2/0/w/1280/h/960/format/webp/ignore-error/1)]

从图中看到,存在多个以webSocketMessage开头的队列,这是每一个微服务节点创建了一个消息队列,再来看exchange:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5c7CJFk0-1661299441476)(https://user-gold-cdn.xitu.io/2018/12/2/1676ec8502bbbad9?imageView2/0/w/1280/h/960/format/webp/ignore-error/1)]
exchange绑定的消息队列

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-M34qeD2P-1661299441477)(https://user-gold-cdn.xitu.io/2018/12/2/1676ec8eec4700d8?imageView2/0/w/1280/h/960/format/webp/ignore-error/1)]

这里的exchange名称和上面消息队列的名称前缀均是webSocketMessage, 这个都是**由前面的binding配置中的destination指定的,和destination名称保持一致**

当应用向输入流中写入事件时,使用destination作为key(即webSocketMessage),将消息写入名为webSocketMessage的exchange,由于exchange绑定的消息队列前缀均为webSocketMessage且routing key都是#,所以exchange会将消息路由到每一个webSocketMessage开头的消息队列上(这里涉及到rabbitmq的知识点,如过不懂请自行查阅资料),这样每一个微服务都能接收到相同的消息。

我们再来看前面提出的问题,这样的配置可以把消息推送到每一个微服务节点,那么如果需要一个消息只被一个节点接收,该怎么配置呢?很简单,一个配置项就可以搞定:

spring:
? cloud:
? ? stream:
? ? ? bindings:
? ? ? ? websocketMessageIn:
? ? ? ? ? group: test
? ? ? ? ? destination: websocketMessage
? ? ? ? ? binder: defaultRabbit

可以看到,相比前面的配置,仅仅多了一个group的配置,这样配置之后,rabbitmq会生成一个名为websocketMessage.test的消息队列(前面讲到的每个微服务建立的消息队列是自动删除的,即微服务断开连接后消息队列就被删除,而这个消息队列是持久化的,也就是即使所有的微服务节点全部断开连接也不会被删除),所有的微服务节点监听这一个队列,当队列中有消息时,只会被一个节点消费。

要讲的内容到此结束,spring cloud stream的配置远不止这些,但是这些配置已足够完成我所需要做的事情,其他的配置请参考spring cloud stream官方文档:

http://cloud.spring.io/spring-cloud-static/spring-cloud-stream/Fishtown.RC2/single/spring-cloud-stream.html

l

有关Spring Cloud 微服务架构下的 WebSocket 解决方案的更多相关文章

  1. ruby - 在 jRuby 中使用 'fork' 生成进程的替代方案? - 2

    在MRIRuby中我可以这样做:deftransferinternal_server=self.init_serverpid=forkdointernal_server.runend#Maketheserverprocessrunindependently.Process.detach(pid)internal_client=self.init_client#Dootherstuffwithconnectingtointernal_server...internal_client.post('somedata')ensure#KillserverProcess.kill('KILL',

  2. 屏幕录制为什么没声音?检查这2项,轻松解决 - 2

    相信很多人在录制视频的时候都会遇到各种各样的问题,比如录制的视频没有声音。屏幕录制为什么没声音?今天小编就和大家分享一下如何录制音画同步视频的具体操作方法。如果你有录制的视频没有声音,你可以试试这个方法。 一、检查是否打开电脑系统声音相信很多小伙伴在录制视频后会发现录制的视频没有声音,屏幕录制为什么没声音?如果当时没有打开音频录制,则录制好的视频是没有声音的。因此,建议在录制前进行检查。屏幕上没有声音,很可能是因为你的电脑系统的声音被禁止了。您只需打开电脑系统的声音,即可录制音频和图画同步视频。操作方法:步骤1:点击电脑屏幕右下侧的“小喇叭”图案,在上方的选项中,选择“声音”。 步骤2:在“声

  3. 【高数】用拉格朗日中值定理解决极限问题 - 2

    首先回顾一下拉格朗日定理的内容:函数f(x)是在闭区间[a,b]上连续、开区间(a,b)上可导的函数,那么至少存在一个,使得:通过这个表达式我们可以知道,f(x)是函数的主体,a和b可以看作是主体函数f(x)中所取的两个值。那么可以有,  也就意味着我们可以用来替换 这种替换可以用在求某些多项式差的极限中。方法: 外层函数f(x)是一致的,并且h(x)和g(x)是等价无穷小。此时,利用拉格朗日定理,将原式替换为 ,再进行求解,往往会省去复合函数求极限的很多麻烦。使用要注意:1.要先找到主体函数f(x),即外层函数必须相同。2.f(x)找到后,复合部分是等价无穷小。3.要满足作差的形式。如果是加

  4. Observability:从零开始创建 Java 微服务并监控它 (二) - 2

    这篇文章是继上一篇文章“Observability:从零开始创建Java微服务并监控它(一)”的续篇。在上一篇文章中,我们讲述了如何创建一个Javaweb应用,并使用Filebeat来收集应用所生成的日志。在今天的文章中,我来详述如何收集应用的指标,使用APM来监控应用并监督web服务的在线情况。源码可以在地址 https://github.com/liu-xiao-guo/java_observability 进行下载。摄入指标指标被视为可以随时更改的时间点值。当前请求的数量可以改变任何毫秒。你可能有1000个请求的峰值,然后一切都回到一个请求。这也意味着这些指标可能不准确,你还想提取最小/

  5. 深度学习部署:Windows安装pycocotools报错解决方法 - 2

    深度学习部署:Windows安装pycocotools报错解决方法1.pycocotools库的简介2.pycocotools安装的坑3.解决办法更多Ai资讯:公主号AiCharm本系列是作者在跑一些深度学习实例时,遇到的各种各样的问题及解决办法,希望能够帮助到大家。ERROR:Commanderroredoutwithexitstatus1:'D:\Anaconda3\python.exe'-u-c'importsys,setuptools,tokenize;sys.argv[0]='"'"'C:\\Users\\46653\\AppData\\Local\\Temp\\pip-instal

  6. ruby - Faye WebSocket,关闭处理程序被触发后重新连接到套接字 - 2

    我有一个super简单的脚本,它几乎包含了FayeWebSocketGitHub页面上用于处理关闭连接的内容:ws=Faye::WebSocket::Client.new(url,nil,:headers=>headers)ws.on:opendo|event|p[:open]#sendpingcommand#sendtestcommand#ws.send({command:'test'}.to_json)endws.on:messagedo|event|#hereistheentrypointfordatacomingfromtheserver.pJSON.parse(event.d

  7. Ruby 守护进程和 JRuby - 备选方案 - 2

    我有一个应用程序正在从Ruby迁移到JRuby(由于需要通过Java提供更好的Web服务安全支持)。我使用的gem之一是daemons创建后台作业。问题在于它使用fork+exec来创建后台进程,但这对JRuby来说是禁忌。那么-是否有用于创建后台作业的替代gem/wrapper?我目前的想法是只从shell脚本调用rake并让rake任务永远运行......提前致谢,克里斯。更新我们目前正在使用几个与Java线程相关的包装器,即https://github.com/jmettraux/rufus-scheduler和https://github.com/philostler/acts

  8. ruby - 如何更快地解决 project euler #21? - 2

    原始问题Letd(n)bedefinedasthesumofproperdivisorsofn(numberslessthannwhichdivideevenlyinton).Ifd(a)=bandd(b)=a,whereab,thenaandbareanamicablepairandeachofaandbarecalledamicablenumbers.Forexample,theproperdivisorsof220are1,2,4,5,10,11,20,22,44,55and110;therefored(220)=284.Theproperdivisorsof284are1,2,

  9. ruby - Ruby 和 Ruby on Rails 中的三层架构 - 2

    我是一名决定学习Ruby和RubyonRails的ASP.NETMVC开发人员。我已经有所了解并在RoR上创建了一个网站。在ASP.NETMVC上开发,我一直使用三层架构:数据层、业务层和UI(或表示)层。尝试在RubyonRails应用程序中使用这种方法,我发现没有关于它的信息(或者也许我只是找不到它?)。也许有人可以建议我如何在RubyonRails上创建或使用三层架构?附言我使用ruby​​1.9.3和RubyonRails3.2.3。 最佳答案 我建议在制作RoR应用程序时遵循RubyonRails(RoR)风格。Rails

  10. ruby - 为什么这些方法没有解决? - 2

    这个问题在这里已经有了答案:WhydoRubysettersneed"self."qualificationwithintheclass?(3个答案)关闭29天前。给定这段代码:classSomethingattr_accessor:my_variabledefinitialize@my_variable=0enddeffoomy_variable=my_variable+3endends=Something.news.foo我收到这个错误:test.rb:9:in`foo':undefinedmethod`+'fornil:NilClass(NoMethodError)fromtes

随机推荐