草庐IT

Java语言 通过WebSocket实现实时系统通知,以后再也不能装作没看到老板的通知了~~

再来半包 2023-08-30 原文

📞 文章简介:WebSocket实时通知Demo
💡 创作目的:因为公司正在从零搭建CRM,其中有一个需求是系统通知管理,老板发布通知给员工。简单的用数据库实现感觉缺少一些实时性,不是那么生动。于是想到了使用WebSocket建立通讯,让系统中在线的员工可以实时接收到系统通知。借此学习一下WebSocket,

📝 每日一言:学习如一粒种子,只有努力播种才会有收获。

☀️ 今日天气:2022-11-19 多云 满是灰色的🤫

文章目录

效果演示🌈😁

注意:因为是个实现效果的小demo,所以用了若依开源框架快速集成。也借此机会使用一下若依其他生态项目。gif有些掉帧😮

WebSocket核心代码

话不多说直接上代码

WebSocket核心业务类

WebSocket.java


//springboot 组建注解
@Component
//核心socket路径注解
@ServerEndpoint("/websocket/{userId}")
@CrossOrigin
@Lazy
//此注解相当于设置访问URL
public class WebSocket {
	//注入session对象用来读取发送消息
    private Session session;


	//因为涉及到发布短信接收短信还有一些短信的状态在数据库所以引入了messageService
	//这里遇到一个问题,就是socket每次连接会创建多个实例 但是引入了注入到Spring中的bean
	//普通的 @Autowired 注解注入不进来,所以额外写了一个读取Bean的类下面会有
    ICclMessageService cclMessageService = (ICclMessageService) SpringContext.getBean("cclMessageServiceImpl");



	//初始化socket实例,建立连接
	//userId 这个参数自定义,根据路径上面的值而定,可以根据业务修改或者多参传入
    @OnOpen
    public void onOpen(Session session,@PathParam("userId")Integer userId) {
        this.session = session;
        //将连接添加到sockets池
        //CurPool是自定义的一个socket池,将他们集中管理起来
        CurPool.webSockets.put(userId,this);
        System.out.println("【websocket消息】有新的连接,总数为:"+CurPool.webSockets.size());
        //下面是业务逻辑,每次连接会查询没有发送的通知,并接收通知
        LambdaQueryWrapper<CclMessage> lq =new LambdaQueryWrapper<>();
        lq.eq(CclMessage::getSysUserId,userId);
        lq.eq(CclMessage::getSendStatus,0);
        //查询离线的时候发送过来的通知
        List<CclMessage> list = cclMessageService.list(lq);
        if (list!=null){
        	//进行遍历发送
            List<String> msgList =new ArrayList<>();
            for (CclMessage cclMessage : list) {
                String mes = JSON.toJSONString(cclMessage);
                msgList.add(mes);
                cclMessage.setSendStatus(1);
            }
            //调用发送多条消息方法,进行通知
            sendAllMessageByUserId(msgList,userId);
            //发送状态改变
            cclMessageService.updateBatchById(list);
        }
    }

	//连接关闭走的方法
    @OnClose
    public void onClose() {
        // 断开连接删除用户删除session
        Integer userId = Integer.parseInt(this.session.getRequestParameterMap().get("userId").get(0));
        //从socket池中移除实例
        CurPool.webSockets.remove(userId);
        System.out.println("【websocket消息】连接断开,总数为:"+CurPool.webSockets.size());
    }
	//发送消息实例(处理发送消息类型,进行发送)
    @OnMessage
    public void onMessage(String message) {
        MessageVo messageVo = JSON.parseObject(message, MessageVo.class);
        System.out.println("现在存活个数"+CurPool.webSockets.size());
        //发送消息
        String message1 = JSON.toJSONString(messageVo);
        String[] userIds = messageVo.getUserIds().split(",");
        Integer[] userIdsInt = new Integer[userIds.length==0?1:userIds.length];
        int count =0;
        for (String userId : userIds) {
            userIdsInt[count] = Integer.valueOf(userId);
            count++;
        }
        sendByUserIdMessage(message1,userIdsInt);
    }


    // 指定多人通知 (具体发送消息方法)
    public void sendByUserIdMessage(String message,Integer[] userIds) {
    	//遍历发送的用户id,给他们发送消息
        for (Integer userId : userIds) {
        	//在socket池中获取实例
            WebSocket webSocket = CurPool.webSockets.get(userId);
            //如果为null则说明不在线
            if (webSocket == null){
                System.out.println(userId+"需要稍后通知!!!");
            }else {
                try {
                	//发送消息给指定用户
                    webSocket.session.getAsyncRemote().sendText(message);
                    System.out.println("用户名为"+userId+"已经发送完:"+message);
                    //修改数据库中发送状态
                    LambdaQueryWrapper<CclMessage> lq =new LambdaQueryWrapper<CclMessage>();
                    lq.eq(CclMessage::getSysUserId,userId);
                    cclMessageService.update(new CclMessage().setSendStatus(1),lq);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }



    //发送多条消息给个人
    public  void  sendAllMessageByUserId(List<String> messageList,Integer userId ){
        WebSocket webSocket = CurPool.webSockets.get(userId);
        if (webSocket != null){
            for (String msg : messageList) {
                webSocket.session.getAsyncRemote().sendText(msg);
                System.out.println(msg+"消息已经发送完");
                try {
                    Thread.sleep(500);
                }catch (Exception e){
                    System.out.println("延迟失败");
                    e.printStackTrace();
                }
            }
        }

    }

}

上面的是主要的WebSocket业务处理代码,因为写的小demo,想着实现功能,逻辑可能过于不注重性能,勿喷!
重点

  • @ServerEndpoint(“/websocket/{userId}”) 这是定义ws连接路径,根据实际业务自定义参数传递及名称
  • @OnOpen 连接进来以后初始化session实例的方法,主要走一些实例存储等逻辑
  • @OnClose 连接关闭后需要进行的业务逻辑,比如说在池中删除实例、界面对应响应等等等
  • @OnMessage 做一些发送消息,接收消息的业务逻辑

消息实体

MessageVo.java

public class MessageVo {
	//标题
    private String title;
    //内容
    private String content;
    //用户id
    private String userIds;
    }

get、set、toString 我就省略了太占地方了

webSocket 配置bean

WebSocketConfig.java

@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

将WebSocket注入到bean

关于session 、socket池的管理

CurPool.java

/**
 * 统一管理session、websocket
 */
public class CurPool {

    public static Map<Integer, WebSocket> webSockets = new ConcurrentHashMap<>();
    // list 里面第一个存sessionId,第二个存session
    public static Map<Integer, List<Object>> sessionPool = new ConcurrentHashMap<>();
}

主要存储session跟socket实例管理,可以通过池的管理进行范围的区分,后期可以扩展到以组为维度,并非个人。实现群发,创建部门群等操作。

多例注入单例获取bean的方法 (感谢大佬 😍)


@Component
public class SpringContext implements ApplicationContextAware {



	private static final Logger logger = LoggerFactory.getLogger(SpringContext.class);

	private static ApplicationContext applicationContext;

	private static DefaultListableBeanFactory beanFactory;

	public static <T> T getBean(String name, Class<T> clazz) {

		return applicationContext.getBean(name, clazz);
	}

	public static Object getBean(String name) {

		return applicationContext.getBean(name);
	}


	public static DefaultListableBeanFactory getBeanFactory() {
		return beanFactory;
	}

	public static ApplicationContext getApplicationContext() {

		return applicationContext;
	}

	@Override
	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

		SpringContext.applicationContext = applicationContext;

		ConfigurableApplicationContext configurableApplicationContext = (ConfigurableApplicationContext) applicationContext ;
		beanFactory = (DefaultListableBeanFactory) configurableApplicationContext
				.getBeanFactory();
	}


	public static void printBeanDefinitionNames() {
		String[] beanDefinitionNames = applicationContext
				.getBeanDefinitionNames();
		for (int i = 0; i < beanDefinitionNames.length; i++) {
			logger.info(beanDefinitionNames[i]);
		}
	}


	public static XmlBeanDefinitionReader getXmlBeanDefinitionReader() {
		return new XmlBeanDefinitionReader((BeanDefinitionRegistry) beanFactory);
	}

	/**
	 * 动态加载bean
	 * @param fileName
	 * @throws BeanDefinitionStoreException
	 * @throws IOException
	 */
	public static void loadBean(String fileName)
			throws BeanDefinitionStoreException, IOException {

		XmlBeanDefinitionReader beanDefinitionReader = getXmlBeanDefinitionReader();
		beanDefinitionReader.setResourceLoader(applicationContext);
		beanDefinitionReader.setEntityResolver(new ResourceEntityResolver(
				applicationContext));
		beanDefinitionReader.loadBeanDefinitions(applicationContext
				.getResources(fileName));

	}

	public static void loadBean(String fileName, String propertyHolderBeanName)
			throws BeanDefinitionStoreException, IOException {

		XmlBeanDefinitionReader beanDefinitionReader = getXmlBeanDefinitionReader();
		beanDefinitionReader.setResourceLoader(applicationContext);
		beanDefinitionReader.setEntityResolver(new ResourceEntityResolver(
				applicationContext));
		beanDefinitionReader.loadBeanDefinitions(applicationContext
				.getResources(fileName));

		if (propertyHolderBeanName != null) {
			postProcessBeanFactory(propertyHolderBeanName);
		}
	}

	public static void postProcessBeanFactory(String propertyHolderBeanName)
			throws BeanDefinitionStoreException, IOException {
		BeanFactoryPostProcessor bfpp = (BeanFactoryPostProcessor) SpringContext
				.getBean(propertyHolderBeanName);
		bfpp.postProcessBeanFactory(SpringContext.getBeanFactory());
	}
}

哈哈,这是遇到问题的时候去解决在网上大佬发布的,小弟属实佩服!!!

总结

工作以后不像原来系统化的学习,盲目去学,而是抓住工作中的需求。根据工作的需求找到合理的解决方案,在解决问题的同时升华自己,从中不断吸取知识。这强于系统化的学习很多倍。

🌈欢迎大佬们阅读,也希望可以有更好的想法弹出。学习如一粒种子,只有努力播种才会有收获。

有关Java语言 通过WebSocket实现实时系统通知,以后再也不能装作没看到老板的通知了~~的更多相关文章

  1. ruby-on-rails - Rails 常用字符串(用于通知和错误信息等) - 2

    大约一年前,我决定确保每个包含非唯一文本的Flash通知都将从模块中的方法中获取文本。我这样做的最初原因是为了避免一遍又一遍地输入相同的字符串。如果我想更改措辞,我可以在一个地方轻松完成,而且一遍又一遍地重复同一件事而出现拼写错误的可能性也会降低。我最终得到的是这样的:moduleMessagesdefformat_error_messages(errors)errors.map{|attribute,message|"Error:#{attribute.to_s.titleize}#{message}."}enddeferror_message_could_not_find(obje

  2. ruby - 如何将脚本文件的末尾读取为数据文件(Perl 或任何其他语言) - 2

    我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚

  3. java - 等价于 Java 中的 Ruby Hash - 2

    我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/

  4. ruby - 如何根据特征实现 FactoryGirl 的条件行为 - 2

    我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden

  5. ruby - 寻找通过阅读代码确定编程语言的ruby gem? - 2

    几个月前,我读了一篇关于ruby​​gem的博客文章,它可以通过阅读代码本身来确定编程语言。对于我的生活,我不记得博客或gem的名称。谷歌搜索“ruby编程语言猜测”及其变体也无济于事。有人碰巧知道相关gem的名称吗? 最佳答案 是这个吗:http://github.com/chrislo/sourceclassifier/tree/master 关于ruby-寻找通过阅读代码确定编程语言的rubygem?,我们在StackOverflow上找到一个类似的问题:

  6. java - 从 JRuby 调用 Java 类的问题 - 2

    我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www

  7. java - 我的模型类或其他类中应该有逻辑吗 - 2

    我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我

  8. java - 什么相当于 ruby​​ 的 rack 或 python 的 Java wsgi? - 2

    什么是ruby​​的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht

  9. ruby-on-rails - 如何在发布新的 Ruby 或 Rails 版本时收到通知? - 2

    有人知道在发布新版本的Ruby和Rails时收到电子邮件的方法吗?他们有邮件列表,RubyonRails有一个推特,但我不想听到那些随之而来的喧嚣,我只想知道什么时候发布新版本,尤其是那些有安全修复的版本。 最佳答案 从therailsblog获取提要.http://weblog.rubyonrails.org/feed/atom.xml 关于ruby-on-rails-如何在发布新的Ruby或Rails版本时收到通知?,我们在StackOverflow上找到一个类似的问题:

  10. 华为OD机试用Python实现 -【明明的随机数】 2023Q1A - 2

    华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o

随机推荐