草庐IT

websocket消息丢失解决方案

骨力 2023-06-15 原文

后台在使用websocket给前端传消息时,有时消息量过大会有数据丢失的偶发情况,websocket源码中未查看到获取消息发送成功的状态,可以如下解决。

文章目录

一、整体思路

1、后台通过websocket传输给前端消息,并且后台生成校验此消息的定时任务,设置每5秒重发
2、前端接收到消息后将消息通过websocket传输给后台
3、后台如接收到前端的消息则删除对应的发送消息定时任务,如未收到消息则继续发送,设置最多发送5次(超过5次默认认为此条消息记录有误)
4、建议:建议websocket发送消息单独为一个模块,防止定时任务过多抢占服务内存情况发生。

二、代码示例

1、线程池的配置

创建一个配置类,注入线程池的相关配置

@Configuration
public class WebConfig {
    @Bean("threadPoolTaskScheduler")
    public ThreadPoolTaskScheduler getThreadPoolTaskScheduler() {
        // 定时任务线程池
        ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
        // 线程池大小
        executor.setPoolSize(10);
        // 线程执行前缀
        executor.setThreadNamePrefix("ThreadPoolTaskScheduler-");
        // executor.setWaitForTasksToCompleteOnShutdown(true);
        // executor.setAwaitTerminationSeconds(60);
        executor.initialize();
        return executor;
    }

2、消息实体类

@Data
public class TestEntity {
    private String key;//每条消息key要保持唯一
    private Object value;//发送的消息内容
    private Integer sendNum;//同一条消息发送次数
}

3、手动注入所需工具类

package com.media.common.utils;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

@Component
public class SpringContextUtils implements ApplicationContextAware {
    /**
     * 应用上下文
     */
    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        SpringContextUtils.applicationContext = applicationContext;
    }

    public static ApplicationContext getApplicationContext(){
        return applicationContext;
    }

    @SuppressWarnings("unchecked")
    public static <T> T getBean(String name) throws BeansException {
        return (T)applicationContext.getBean(name);
    }

    public static <T> T getBean(Class<T> clz) throws BeansException {
        return (T)applicationContext.getBean(clz);
    }
}

4、定时任务编辑类

将所有的定时任务放入一个队列中,如想要停止此定时任务,直接将队列中对应的key删除即可(不同的消息要保持key唯一)

package com.media.msg.websockettest;

import com.alibaba.fastjson2.JSONObject;
import com.media.common.dto.common.ApiResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.web.bind.annotation.*;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;

@RestController
public class TestController {
    private final Logger log = LoggerFactory.getLogger(this.getClass());

    private final String cron = "0/5 * * * * ?";//5秒重发消息

    // 线程池
    @Autowired
    private ThreadPoolTaskScheduler threadPoolTaskScheduler;

    // 任务队列管理
    @SuppressWarnings("rawtypes")
    private ConcurrentHashMap<String, ScheduledFuture> futureMap = new ConcurrentHashMap<String, ScheduledFuture>();

    // 加入新的任务进来
    @SuppressWarnings({"rawtypes"})
    @PostMapping("/addSchedule")
    public ApiResult addSchedule(@RequestBody TestEntity t) {
        DelayTaskExecTest task = new DelayTaskExecTest(t);
        ScheduledFuture<?> schedule = threadPoolTaskScheduler.schedule(task, new CronTrigger(cron));
        System.out.println("新消息已添加到定时任务:" + JSONObject.toJSONString(t));
        // 加入到队列中,
        futureMap.put(t.getKey(), schedule);

        return ApiResult.success();
    }

    // 移除已有的一个任务
    @SuppressWarnings("rawtypes")
    @PostMapping("/removeSchedule")
    public ApiResult removeSchedule(@RequestParam("key") String key) {
        ScheduledFuture scheduledFuture = futureMap.get(key);
        if (scheduledFuture != null) {
            // 取消定时任务
            scheduledFuture.cancel(true);
            // 如果任务取消需要消耗点时间
            boolean cancelled = scheduledFuture.isCancelled();
            while (!cancelled) {
                scheduledFuture.cancel(true);
                System.out.println(key + "取消中");
            }
            System.out.println(key + "任务移除成功");
            // 最后从队列中删除
            futureMap.remove(key);
        }
        return ApiResult.success();
    }
}

5、定时任务处理类

在run方法中写相关逻辑,我这里是调用了websocket的消息发送接口。
此外,要注意实现了Runnable的接口@Autowired注入会为null,所以需要手动注入

package com.media.msg.websockettest;

import com.media.common.utils.SpringContextUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DelayTaskExecTest implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(DelayTaskExecTest.class);

    //在Runnable @Autowired注入会null  所以需要手动注入
    private TestController testController;

    TestEntity testEntity;

    public DelayTaskExecTest(TestEntity testEntity) {
        this.testEntity = testEntity;
    }

    @Override
    public void run() {
        //执行具体的定时任务业务逻辑
        log.info("发送websocket消息,key={},第{}次发送", testEntity.getKey(), testEntity.getSendNum());
        Integer newSendNum = testEntity.getSendNum() + 1;
        testEntity.setSendNum(newSendNum);

        //手动注入
        testController = SpringContextUtils.getApplicationContext().getBean(TestController.class);
        //这里根据自己websocket调用方法,调用对应的发送消息的接口即可
//        webSocketController.sendObjMessage("1",testEntity);

        if (newSendNum > 5) {
            //如果次数大于5,则直接关闭此消息的定时任务发送
            testController.removeSchedule(testEntity.getKey());
        }

    }
}

6、websocket消息接收处理

如果后台接收到了前端传来的消息,则将此消息在队列中删除

    @OnMessage
    public void onMessage(String message, Session session) throws Exception{

        System.out.println("接收到了前端的消息:" + message);
        //接收到前端的消息,转化为消息实体,删除对应的定时任务
        try {
            TestEntity testEntity= JSONObject.parseObject(message, TestEntity.class);
            webSocketController.removeSchedule(testEntity.getKey());
            return;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return;
    }

7、结果测试

7.1 测试定时任务超过次数自动关闭

定时任务同一条消息发送超过5次则自动关闭,直接调用生成定时任务的接口即可

结果如下

7.2 测试websocket接收消息删除定时任务

websocket接收到前端传来的消息后,删除发送消息的定时任务,用websocket在线连接方式测试

结果如下

综上,整体的流程已经完成,可以根据需要自行修改定时间隔和次数,有意见和建议欢迎留言!

有关websocket消息丢失解决方案的更多相关文章

  1. ruby - 在 jRuby 中使用 'fork' 生成进程的替代方案? - 2

    在MRIRuby中我可以这样做:deftransferinternal_server=self.init_serverpid=forkdointernal_server.runend#Maketheserverprocessrunindependently.Process.detach(pid)internal_client=self.init_client#Dootherstuffwithconnectingtointernal_server...internal_client.post('somedata')ensure#KillserverProcess.kill('KILL',

  2. ruby-on-rails - 如何在 Rails View 上显示错误消息? - 2

    我是rails的新手,想在form字段上应用验证。myviewsnew.html.erb.....模拟.rbclassSimulation{:in=>1..25,:message=>'Therowmustbebetween1and25'}end模拟Controller.rbclassSimulationsController我想检查模型类中row字段的整数范围,如果不在范围内则返回错误信息。我可以检查上面代码的范围,但无法返回错误消息提前致谢 最佳答案 关键是您使用的是模型表单,一种显示ActiveRecord模型实例属性的表单。c

  3. ruby - 使用 Ruby 通过 Outlook 发送消息的最简单方法是什么? - 2

    我的工作要求我为某些测试自动生成电子邮件。我一直在四处寻找,但未能找到可以快速实现的合理解决方案。它需要在outlook而不是其他邮件服务器中,因为我们有一些奇怪的身份验证规则,我们需要保存草稿而不是仅仅发送邮件的选项。显然win32ole可以做到这一点,但我找不到任何相当简单的例子。 最佳答案 假设存储了Outlook凭据并且您设置为自动登录到Outlook,WIN32OLE可以很好地完成此操作:require'win32ole'outlook=WIN32OLE.new('Outlook.Application')message=

  4. 屏幕录制为什么没声音?检查这2项,轻松解决 - 2

    相信很多人在录制视频的时候都会遇到各种各样的问题,比如录制的视频没有声音。屏幕录制为什么没声音?今天小编就和大家分享一下如何录制音画同步视频的具体操作方法。如果你有录制的视频没有声音,你可以试试这个方法。 一、检查是否打开电脑系统声音相信很多小伙伴在录制视频后会发现录制的视频没有声音,屏幕录制为什么没声音?如果当时没有打开音频录制,则录制好的视频是没有声音的。因此,建议在录制前进行检查。屏幕上没有声音,很可能是因为你的电脑系统的声音被禁止了。您只需打开电脑系统的声音,即可录制音频和图画同步视频。操作方法:步骤1:点击电脑屏幕右下侧的“小喇叭”图案,在上方的选项中,选择“声音”。 步骤2:在“声

  5. Ruby - 如何将消息长度表示为 2 个二进制字节 - 2

    我正在使用Ruby,我正在与一个网络端点通信,该端点在发送消息本身之前需要格式化“header”。header中的第一个字段必须是消息长度,它被定义为网络字节顺序中的2二进制字节消息长度。比如我的消息长度是1024。如何将1024表示为二进制双字节? 最佳答案 Ruby(以及Perl和Python等)中字节整理的标准工具是pack和unpack。ruby的packisinArray.您的长度应该是两个字节长,并且按网络字节顺序排列,这听起来像是n格式说明符的工作:n|Integer|16-bitunsigned,network(bi

  6. 【高数】用拉格朗日中值定理解决极限问题 - 2

    首先回顾一下拉格朗日定理的内容:函数f(x)是在闭区间[a,b]上连续、开区间(a,b)上可导的函数,那么至少存在一个,使得:通过这个表达式我们可以知道,f(x)是函数的主体,a和b可以看作是主体函数f(x)中所取的两个值。那么可以有,  也就意味着我们可以用来替换 这种替换可以用在求某些多项式差的极限中。方法: 外层函数f(x)是一致的,并且h(x)和g(x)是等价无穷小。此时,利用拉格朗日定理,将原式替换为 ,再进行求解,往往会省去复合函数求极限的很多麻烦。使用要注意:1.要先找到主体函数f(x),即外层函数必须相同。2.f(x)找到后,复合部分是等价无穷小。3.要满足作差的形式。如果是加

  7. 深度学习部署:Windows安装pycocotools报错解决方法 - 2

    深度学习部署:Windows安装pycocotools报错解决方法1.pycocotools库的简介2.pycocotools安装的坑3.解决办法更多Ai资讯:公主号AiCharm本系列是作者在跑一些深度学习实例时,遇到的各种各样的问题及解决办法,希望能够帮助到大家。ERROR:Commanderroredoutwithexitstatus1:'D:\Anaconda3\python.exe'-u-c'importsys,setuptools,tokenize;sys.argv[0]='"'"'C:\\Users\\46653\\AppData\\Local\\Temp\\pip-instal

  8. ruby-on-rails - 在 Flash 警报 Rails 3 中显示错误消息 - 2

    如果我在模型中设置验证消息validates:name,:presence=>{:message=>'Thenamecantbeblank.'}我如何让该消息显示在闪光警报中,这是我迄今为止尝试过的方法defcreate@message=Message.new(params[:message])if@message.valid?ContactMailer.send_mail(@message).deliverredirect_to(root_path,:notice=>"Thanksforyourmessage,Iwillbeintouchsoon")elseflash[:error]

  9. ruby-on-rails - 在 RSpec 中,如何以任意顺序期望具有不同参数的多条消息? - 2

    RSpec似乎按顺序匹配方法接收的消息。我不确定如何使以下代码工作:allow(a).toreceive(:f)expect(a).toreceive(:f).with(2)a.f(1)a.f(2)a.f(3)我问的原因是a.f的一些调用是由我的代码的上层控制的,所以我不能对这些方法调用添加期望。 最佳答案 RSpecspy是测试这种情况的一种方式。要监视一个方法,用allowstub,除了方法名称之外没有任何约束,调用该方法,然后expect确切的方法调用。例如:allow(a).toreceive(:f)a.f(2)a.f(1)

  10. ruby - Faye WebSocket,关闭处理程序被触发后重新连接到套接字 - 2

    我有一个super简单的脚本,它几乎包含了FayeWebSocketGitHub页面上用于处理关闭连接的内容:ws=Faye::WebSocket::Client.new(url,nil,:headers=>headers)ws.on:opendo|event|p[:open]#sendpingcommand#sendtestcommand#ws.send({command:'test'}.to_json)endws.on:messagedo|event|#hereistheentrypointfordatacomingfromtheserver.pJSON.parse(event.d

随机推荐