📞 文章简介:WebSocket实时通知Demo
💡 创作目的:因为公司正在从零搭建CRM,其中有一个需求是系统通知管理,老板发布通知给员工。简单的用数据库实现感觉缺少一些实时性,不是那么生动。于是想到了使用WebSocket建立通讯,让系统中在线的员工可以实时接收到系统通知。借此学习一下WebSocket,📝 每日一言:学习如一粒种子,只有努力播种才会有收获。
☀️ 今日天气:2022-11-19 多云 满是灰色的🤫
文章目录
效果演示🌈😁
注意:因为是个实现效果的小demo,所以用了若依开源框架快速集成。也借此机会使用一下若依其他生态项目。gif有些掉帧😮
话不多说直接上代码
WebSocket核心业务类
WebSocket.java
//springboot 组建注解
@Component
//核心socket路径注解
@ServerEndpoint("/websocket/{userId}")
@CrossOrigin
@Lazy
//此注解相当于设置访问URL
public class WebSocket {
//注入session对象用来读取发送消息
private Session session;
//因为涉及到发布短信接收短信还有一些短信的状态在数据库所以引入了messageService
//这里遇到一个问题,就是socket每次连接会创建多个实例 但是引入了注入到Spring中的bean
//普通的 @Autowired 注解注入不进来,所以额外写了一个读取Bean的类下面会有
ICclMessageService cclMessageService = (ICclMessageService) SpringContext.getBean("cclMessageServiceImpl");
//初始化socket实例,建立连接
//userId 这个参数自定义,根据路径上面的值而定,可以根据业务修改或者多参传入
@OnOpen
public void onOpen(Session session,@PathParam("userId")Integer userId) {
this.session = session;
//将连接添加到sockets池
//CurPool是自定义的一个socket池,将他们集中管理起来
CurPool.webSockets.put(userId,this);
System.out.println("【websocket消息】有新的连接,总数为:"+CurPool.webSockets.size());
//下面是业务逻辑,每次连接会查询没有发送的通知,并接收通知
LambdaQueryWrapper<CclMessage> lq =new LambdaQueryWrapper<>();
lq.eq(CclMessage::getSysUserId,userId);
lq.eq(CclMessage::getSendStatus,0);
//查询离线的时候发送过来的通知
List<CclMessage> list = cclMessageService.list(lq);
if (list!=null){
//进行遍历发送
List<String> msgList =new ArrayList<>();
for (CclMessage cclMessage : list) {
String mes = JSON.toJSONString(cclMessage);
msgList.add(mes);
cclMessage.setSendStatus(1);
}
//调用发送多条消息方法,进行通知
sendAllMessageByUserId(msgList,userId);
//发送状态改变
cclMessageService.updateBatchById(list);
}
}
//连接关闭走的方法
@OnClose
public void onClose() {
// 断开连接删除用户删除session
Integer userId = Integer.parseInt(this.session.getRequestParameterMap().get("userId").get(0));
//从socket池中移除实例
CurPool.webSockets.remove(userId);
System.out.println("【websocket消息】连接断开,总数为:"+CurPool.webSockets.size());
}
//发送消息实例(处理发送消息类型,进行发送)
@OnMessage
public void onMessage(String message) {
MessageVo messageVo = JSON.parseObject(message, MessageVo.class);
System.out.println("现在存活个数"+CurPool.webSockets.size());
//发送消息
String message1 = JSON.toJSONString(messageVo);
String[] userIds = messageVo.getUserIds().split(",");
Integer[] userIdsInt = new Integer[userIds.length==0?1:userIds.length];
int count =0;
for (String userId : userIds) {
userIdsInt[count] = Integer.valueOf(userId);
count++;
}
sendByUserIdMessage(message1,userIdsInt);
}
// 指定多人通知 (具体发送消息方法)
public void sendByUserIdMessage(String message,Integer[] userIds) {
//遍历发送的用户id,给他们发送消息
for (Integer userId : userIds) {
//在socket池中获取实例
WebSocket webSocket = CurPool.webSockets.get(userId);
//如果为null则说明不在线
if (webSocket == null){
System.out.println(userId+"需要稍后通知!!!");
}else {
try {
//发送消息给指定用户
webSocket.session.getAsyncRemote().sendText(message);
System.out.println("用户名为"+userId+"已经发送完:"+message);
//修改数据库中发送状态
LambdaQueryWrapper<CclMessage> lq =new LambdaQueryWrapper<CclMessage>();
lq.eq(CclMessage::getSysUserId,userId);
cclMessageService.update(new CclMessage().setSendStatus(1),lq);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
//发送多条消息给个人
public void sendAllMessageByUserId(List<String> messageList,Integer userId ){
WebSocket webSocket = CurPool.webSockets.get(userId);
if (webSocket != null){
for (String msg : messageList) {
webSocket.session.getAsyncRemote().sendText(msg);
System.out.println(msg+"消息已经发送完");
try {
Thread.sleep(500);
}catch (Exception e){
System.out.println("延迟失败");
e.printStackTrace();
}
}
}
}
}
上面的是主要的WebSocket业务处理代码,因为写的小demo,想着实现功能,逻辑可能过于不注重性能,勿喷!
重点
消息实体
MessageVo.java
public class MessageVo {
//标题
private String title;
//内容
private String content;
//用户id
private String userIds;
}
get、set、toString 我就省略了太占地方了
webSocket 配置bean
WebSocketConfig.java
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
将WebSocket注入到bean
关于session 、socket池的管理
CurPool.java
/**
* 统一管理session、websocket
*/
public class CurPool {
public static Map<Integer, WebSocket> webSockets = new ConcurrentHashMap<>();
// list 里面第一个存sessionId,第二个存session
public static Map<Integer, List<Object>> sessionPool = new ConcurrentHashMap<>();
}
主要存储session跟socket实例管理,可以通过池的管理进行范围的区分,后期可以扩展到以组为维度,并非个人。实现群发,创建部门群等操作。
多例注入单例获取bean的方法 (感谢大佬 😍)
@Component
public class SpringContext implements ApplicationContextAware {
private static final Logger logger = LoggerFactory.getLogger(SpringContext.class);
private static ApplicationContext applicationContext;
private static DefaultListableBeanFactory beanFactory;
public static <T> T getBean(String name, Class<T> clazz) {
return applicationContext.getBean(name, clazz);
}
public static Object getBean(String name) {
return applicationContext.getBean(name);
}
public static DefaultListableBeanFactory getBeanFactory() {
return beanFactory;
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringContext.applicationContext = applicationContext;
ConfigurableApplicationContext configurableApplicationContext = (ConfigurableApplicationContext) applicationContext ;
beanFactory = (DefaultListableBeanFactory) configurableApplicationContext
.getBeanFactory();
}
public static void printBeanDefinitionNames() {
String[] beanDefinitionNames = applicationContext
.getBeanDefinitionNames();
for (int i = 0; i < beanDefinitionNames.length; i++) {
logger.info(beanDefinitionNames[i]);
}
}
public static XmlBeanDefinitionReader getXmlBeanDefinitionReader() {
return new XmlBeanDefinitionReader((BeanDefinitionRegistry) beanFactory);
}
/**
* 动态加载bean
* @param fileName
* @throws BeanDefinitionStoreException
* @throws IOException
*/
public static void loadBean(String fileName)
throws BeanDefinitionStoreException, IOException {
XmlBeanDefinitionReader beanDefinitionReader = getXmlBeanDefinitionReader();
beanDefinitionReader.setResourceLoader(applicationContext);
beanDefinitionReader.setEntityResolver(new ResourceEntityResolver(
applicationContext));
beanDefinitionReader.loadBeanDefinitions(applicationContext
.getResources(fileName));
}
public static void loadBean(String fileName, String propertyHolderBeanName)
throws BeanDefinitionStoreException, IOException {
XmlBeanDefinitionReader beanDefinitionReader = getXmlBeanDefinitionReader();
beanDefinitionReader.setResourceLoader(applicationContext);
beanDefinitionReader.setEntityResolver(new ResourceEntityResolver(
applicationContext));
beanDefinitionReader.loadBeanDefinitions(applicationContext
.getResources(fileName));
if (propertyHolderBeanName != null) {
postProcessBeanFactory(propertyHolderBeanName);
}
}
public static void postProcessBeanFactory(String propertyHolderBeanName)
throws BeanDefinitionStoreException, IOException {
BeanFactoryPostProcessor bfpp = (BeanFactoryPostProcessor) SpringContext
.getBean(propertyHolderBeanName);
bfpp.postProcessBeanFactory(SpringContext.getBeanFactory());
}
}
哈哈,这是遇到问题的时候去解决在网上大佬发布的,小弟属实佩服!!!
总结
工作以后不像原来系统化的学习,盲目去学,而是抓住工作中的需求。根据工作的需求找到合理的解决方案,在解决问题的同时升华自己,从中不断吸取知识。这强于系统化的学习很多倍。
🌈欢迎大佬们阅读,也希望可以有更好的想法弹出。学习如一粒种子,只有努力播种才会有收获。
大约一年前,我决定确保每个包含非唯一文本的Flash通知都将从模块中的方法中获取文本。我这样做的最初原因是为了避免一遍又一遍地输入相同的字符串。如果我想更改措辞,我可以在一个地方轻松完成,而且一遍又一遍地重复同一件事而出现拼写错误的可能性也会降低。我最终得到的是这样的:moduleMessagesdefformat_error_messages(errors)errors.map{|attribute,message|"Error:#{attribute.to_s.titleize}#{message}."}enddeferror_message_could_not_find(obje
我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚
我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/
我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden
几个月前,我读了一篇关于rubygem的博客文章,它可以通过阅读代码本身来确定编程语言。对于我的生活,我不记得博客或gem的名称。谷歌搜索“ruby编程语言猜测”及其变体也无济于事。有人碰巧知道相关gem的名称吗? 最佳答案 是这个吗:http://github.com/chrislo/sourceclassifier/tree/master 关于ruby-寻找通过阅读代码确定编程语言的rubygem?,我们在StackOverflow上找到一个类似的问题:
我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www
我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我
什么是ruby的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht
有人知道在发布新版本的Ruby和Rails时收到电子邮件的方法吗?他们有邮件列表,RubyonRails有一个推特,但我不想听到那些随之而来的喧嚣,我只想知道什么时候发布新版本,尤其是那些有安全修复的版本。 最佳答案 从therailsblog获取提要.http://weblog.rubyonrails.org/feed/atom.xml 关于ruby-on-rails-如何在发布新的Ruby或Rails版本时收到通知?,我们在StackOverflow上找到一个类似的问题:
华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o