websocket+定时任务实现实时推送
有时候业务要求websocket连接后,服务端实时每隔一段时间就将数据推送给客户端进行响应,这时就需要websocket+定时任务一起来实现实时推送数据给客户端了。
使用的定时任务方式为spring的TaskScheduler对象实现任务调度。
public interface TaskScheduler {
//通过触发器来决定task是否执行
ScheduledFuture schedule(Runnable task, Trigger trigger);
//在starttime的时候执行一次
ScheduledFuture schedule(Runnable task, Date startTime);
ScheduledFuture schedule(Runnable task, Instant startTime);
//从starttime开始每个period时间段执行一次task
ScheduledFuture scheduleAtFixedRate(Runnable task, Instant startTime, Duration period);
ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period);
//每隔period执行一次
ScheduledFuture scheduleAtFixedRate(Runnable task, long period);
ScheduledFuture scheduleAtFixedRate(Runnable task, Duration period);
//从startTime开始每隔delay长时间执行一次
ScheduledFuture scheduleWithFixedDelay(Runnable task, Date startTime, long delay);
//每隔delay时间执行一次
ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay);
}
import cn.hutool.core.date.DateUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
/**
* The type Task scheduler test.
*
* @author yjj
* @version 1.0
* @since 2022 -12-28 15:45:17
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class TaskSchedulerTest {
private final TaskScheduler taskScheduler;
@Bean
public void test() {
//每隔3秒执行一次
Trigger trigger = new CronTrigger("0/3 * * * * *");
//每隔1秒执行一次
//Trigger trigger1 = new PeriodicTrigger(1, TimeUnit.SECONDS);
taskScheduler.schedule(new MyThread(), trigger);
}
private class MyThread implements Runnable {
@Override
public void run() {
log.info("定时执行线程名称=【{}】,执行时间=【{}】", Thread.currentThread().getName(), DateUtil.date());
}
}
}
效果就是每个3秒执行一次

实现的业务需求如下:客户端连上来以后就每隔3秒向客户端实时推送消息。有关websocket的实现见文章websocket简单实现
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.json.JSONUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
/**
* 测试websocket
*
* @author yjj
* @version 1.0
* @since 2022 -12-28 14:55:29
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class TestWebsocket implements WebSocketHandler {
protected static final CopyOnWriteArrayList<WebSocketSession> WEB_SOCKET_SESSIONS = new CopyOnWriteArrayList<>();
/**
* 定时任务集合
*/
Map<String, ScheduledFuture<?>> stringScheduledFutureMap = new ConcurrentHashMap<>();
/**
* taskScheduler
*/
private final TaskScheduler taskScheduler;
/**
* 建立连接后操作
*
* @param session 连接session信息
* @throws Exception exception
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
sendMessage("连接成功~~~~~~,sessionId=" + session.getId());
WEB_SOCKET_SESSIONS.add(session);
//设置定时任务,每隔3s执行一次
Trigger trigger = new CronTrigger("0/3 * * * * *");
//开启一个定时任务
ScheduledFuture<?> schedule = taskScheduler.schedule(new CustomizeTask(session.getId()), trigger);
//根据session连接id定时任务线程存到map中
stringScheduledFutureMap.put(session.getId(), schedule);
}
private class CustomizeTask implements Runnable {
private final String sessionId;
CustomizeTask(String sessionId) {
this.sessionId = sessionId;
}
@Override
public void run() {
try {
String message = CharSequenceUtil.format("定时执行线程名称=【{}】,执行时间=【{}】", Thread.currentThread().getName(), DateUtil.date());
sendMessage(JSONUtil.toJsonStr(message), sessionId);
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 接收到消息后的处理
*
* @param session 连接session信息
* @param message 信息
* @throws Exception exception
*/
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
sendMessage("接收到的消息为=【" + message + "】,sessionId=【" + session.getId() + "】,回复消息=【你好呀!】");
}
/**
* ws连接出错时调用
*
* @param session session连接信息
* @param exception exception
* @throws Exception exception
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
if (session.isOpen()) {
sendMessage("ws连接出错,即将关闭此session,sessionId=【" + session.getId() + "】");
session.close();
}
WEB_SOCKET_SESSIONS.remove(session);
}
/**
* 连接关闭后调用
*
* @param session session连接信息
* @param closeStatus 关闭状态
* @throws Exception exception
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
if (session.isOpen()) {
sendMessage("ws连接即将关闭此session,sessionId=【" + session.getId() + "】");
session.close();
}
WEB_SOCKET_SESSIONS.remove(session);
String sessionId = session.getId();
ScheduledFuture<?> scheduledFuture = MapUtil.get(stringScheduledFutureMap, sessionId, ScheduledFuture.class);
if (scheduledFuture != null) {
//暂停对应session的开启的定时任务
scheduledFuture.cancel(true);
//集合移除
stringScheduledFutureMap.remove(sessionId);
}
}
/**
* 是否支持分片消息
*/
@Override
public boolean supportsPartialMessages() {
return false;
}
/**
* 群发发送消息
*
* @param message 消息
* @throws IOException ioException
*/
public void sendMessage(String message) throws IOException {
if (CollUtil.isNotEmpty(WEB_SOCKET_SESSIONS)) {
for (WebSocketSession webSocketSession : WEB_SOCKET_SESSIONS) {
webSocketSession.sendMessage(new TextMessage(message));
}
}
}
/**
* 发给指定连接消息
*
* @param message 消息
* @throws IOException ioException
*/
public void sendMessage(String message, String sessionId) throws IOException {
if (CollUtil.isNotEmpty(WEB_SOCKET_SESSIONS)) {
for (WebSocketSession webSocketSession : WEB_SOCKET_SESSIONS) {
if (sessionId.equals(webSocketSession.getId())) {
webSocketSession.sendMessage(new TextMessage(message));
}
}
}
}
}
import com.yjj.test.websocket.TestWebsocket;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import javax.annotation.Resource;
/**
* websocket配置
*
* @author yjj
* @version 1.0
* @since 2022 -12-28 15:10:11
*/
@EnableWebSocket
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {
@Resource
private TestWebsocket testWebsocket;
/**
* Register {@link WebSocketHandler WebSocketHandlers} including SockJS fallback options if desired.
*
* @param registry
*/
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(testWebsocket, "/test").setAllowedOrigins("*");
}
}
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
/**
* 当定时任务和websocket同时存在时报错解决
*
* @author yjj
* @version 1.0
* @since 2022 -04-28 17:35:54
*/
@Configuration
public class ScheduledConfig {
/**
* Schedule本身是单线程执行的
*
* @return the task scheduler
*/
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduling = new ThreadPoolTaskScheduler();
scheduling.setPoolSize(20);
return scheduling;
}
}

我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时
如何使用RSpec::Core::RakeTask初始化RSpecRake任务?require'rspec/core/rake_task'RSpec::Core::RakeTask.newdo|t|#whatdoIputinhere?endInitialize函数记录在http://rubydoc.info/github/rspec/rspec-core/RSpec/Core/RakeTask#initialize-instance_method没有很好的记录;它只是说:-(RakeTask)initialize(*args,&task_block)AnewinstanceofRake
我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden
华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o
C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.
MIMO技术的优缺点优点通过下面三个增益来总体概括:阵列增益。阵列增益是指由于接收机通过对接收信号的相干合并而活得的平均SNR的提高。在发射机不知道信道信息的情况下,MIMO系统可以获得的阵列增益与接收天线数成正比复用增益。在采用空间复用方案的MIMO系统中,可以获得复用增益,即信道容量成倍增加。信道容量的增加与min(Nt,Nr)成正比分集增益。在采用空间分集方案的MIMO系统中,可以获得分集增益,即可靠性性能的改善。分集增益用独立衰落支路数来描述,即分集指数。在使用了空时编码的MIMO系统中,由于接收天线或发射天线之间的间距较远,可认为它们各自的大尺度衰落是相互独立的,因此分布式MIMO
遍历文件夹我们通常是使用递归进行操作,这种方式比较简单,也比较容易理解。本文为大家介绍另一种不使用递归的方式,由于没有使用递归,只用到了循环和集合,所以效率更高一些!一、使用递归遍历文件夹整体思路1、使用File封装初始目录,2、打印这个目录3、获取这个目录下所有的子文件和子目录的数组。4、遍历这个数组,取出每个File对象4-1、如果File是否是一个文件,打印4-2、否则就是一个目录,递归调用代码实现publicclassSearchFile{publicstaticvoidmain(String[]args){//初始目录Filedir=newFile("d:/Dev");Datebeg
通常,数组被实现为内存块,集合被实现为HashMap,有序集合被实现为跳跃列表。在Ruby中也是如此吗?我正在尝试从性能和内存占用方面评估Ruby中不同容器的使用情况 最佳答案 数组是Ruby核心库的一部分。每个Ruby实现都有自己的数组实现。Ruby语言规范只规定了Ruby数组的行为,并没有规定任何特定的实现策略。它甚至没有指定任何会强制或至少建议特定实现策略的性能约束。然而,大多数Rubyist对数组的性能特征有一些期望,这会迫使不符合它们的实现变得默默无闻,因为实际上没有人会使用它:插入、前置或追加以及删除元素的最坏情况步骤复
我写了一个非常简单的rake任务来尝试找到这个问题的根源。namespace:foodotaskbar::environmentdoputs'RUNNING'endend当在控制台中执行rakefoo:bar时,输出为:RUNNINGRUNNING当我执行任何rake任务时会发生这种情况。有没有人遇到过这样的事情?编辑上面的rake任务就是写在那个.rake文件中的所有内容。这是当前正在使用的Rakefile。requireFile.expand_path('../config/application',__FILE__)OurApp::Application.load_tasks这里
在ruby中,你可以这样做:classThingpublicdeff1puts"f1"endprivatedeff2puts"f2"endpublicdeff3puts"f3"endprivatedeff4puts"f4"endend现在f1和f3是公共(public)的,f2和f4是私有(private)的。内部发生了什么,允许您调用一个类方法,然后更改方法定义?我怎样才能实现相同的功能(表面上是创建我自己的java之类的注释)例如...classThingfundeff1puts"hey"endnotfundeff2puts"hey"endendfun和notfun将更改以下函数定