文章目录
后端向前端推送信息,通知任务完成
| 轮询 | SSE | WebSocket | |
|---|---|---|---|
| 请求方式 | HTTP | HTTP | TCP长连接 |
| 触发方式 | 轮询 | 事件 | 事件 |
| 优点 | 实现简单易兼容 | 实现简单开发成本低 | 全双工通信,开销小,安全,可扩展 |
| 缺点 | 消耗较大 | 不兼容IE | 传输数据需二次解析,开发成本大 |
| 适用场景 | 服务端向客户端单向推送 | 网络游戏、银行交互、支付 |
pip install flask
main.py
import time
import threading
from flask_cors import CORS
from flask import Flask, redirect
app = Flask(__name__)
cors = CORS(app)
job = {} # 任务状态
def do_job(id):
global job
job[id] = 'doing'
time.sleep(5)
job[id] = 'done'
@app.route('/job/<id>', methods=['POST'])
def create(id):
"""创建任务"""
threading.Thread(target=do_job, args=(id,)).start()
response = redirect(f'/job/{id}') # 重定向到查询该任务状态
return response
@app.route('/job/<id>', methods=['GET'])
def status(id):
"""查询任务状态"""
return job.get(id, 'not exist')
if __name__ == '__main__':
app.run()
index.html
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<title>轮询</title>
<script src="https://cdn.staticfile.org/jquery/3.6.1/jquery.min.js"></script>
</head>
<body>
<button id="create">执行任务</button>
</body>
<script>
$("#create").click(function () {
var id = parseInt(Math.random() * 100000000); // 任务ID
$.post({
url: "http://127.0.0.1:5000/job/" + id.toString(),
success: function (response) {
$("body").append("<p id='p" + id.toString() + "'>任务" + id.toString() + ":created</p>");
var interval = setInterval(function () {
$.get({
url: "http://127.0.0.1:5000/job/" + id.toString(),
success: function (response) {
console.log(response);
$("#p" + id.toString()).text("任务" + id.toString() + ":" + response)
if (response === 'done') {
clearInterval(interval);
}
}
});
}, 1000);
}
});
});
</script>
</html>
效果

需要异步启动 + Redis
gunicorn 无法在 Windows 上运行,WSL 对 gevent 支持不友好,建议在纯 Linux 系统下使用
安装 Redis
sudo apt update
sudo apt install redis-server
安装
pip install flask-sse gunicorn gevent
sse.py
from flask import Flask, render_template
from flask_sse import sse
from flask_cors import CORS
app = Flask(__name__)
app.config['REDIS_URL'] = 'redis://localhost'
app.register_blueprint(sse, url_prefix='/stream')
cors = CORS(app)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/hello')
def publish_hello():
sse.publish({'message': 'Hello!'}, type='greeting')
return 'Message sent!'
templates/index.html
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<title>SSE</title>
<script src="https://cdn.staticfile.org/jquery/3.6.1/jquery.min.js"></script>
</head>
<body>
<h1>Flask-SSE Quickstart</h1>
<script>
var source = new EventSource("stream");
source.addEventListener("greeting", function (event) {
var data = JSON.parse(event.data);
console.log(data.message)
$("body").append("<p>" + data.message + "</p>");
}, false);
source.addEventListener("error", function (event) {
console.log("Failed to connect to event stream. Is Redis running?");
}, false);
</script>
</body>
</html>
启动
gunicorn sse:app --worker-class gevent --bind 127.0.0.1:8000
nginx 配置
location ^~ /sse/ {
proxy_pass http://127.0.0.1:8000/;
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding off;
}
效果

异步启动,eventlet 性能最好,然后是 gevent
pip install flask-socketio gunicorn eventlet
或
pip install flask-socketio gunicorn gevent-websocket
main.py
from flask_socketio import SocketIO
from flask import Flask, render_template, request
app = Flask(__name__)
socketio = SocketIO(app, cors_allowed_origins='*')
connected_sids = set() # 存放已连接的客户端
@app.route('/')
def index():
return render_template('index.html')
@socketio.on('connect')
def on_connect():
connected_sids.add(request.sid)
print(f'{request.sid} 已连接')
@socketio.on('disconnect')
def on_disconnect():
connected_sids.remove(request.sid)
print(f'{request.sid} 已断开')
@socketio.on('message')
def handle_message(message):
"""收消息"""
data = message['data']
print(f'{request.sid} {data}')
@app.route('/hello', defaults={'sid': None})
@app.route('/hello/<sid>')
def hello(sid):
"""发消息"""
if sid:
if sid in connected_sids:
socketio.emit('my_response', {'data': f'Hello, {sid}!'}, room=sid)
return f'已发信息给{sid}'
else:
return f'{sid}不存在'
else:
socketio.emit('my_response', {'data': 'Hello!'})
return '已群发信息'
if __name__ == '__main__':
socketio.run(app)
templates/index.html
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<title>WebSocket</title>
<script src="https://cdn.staticfile.org/jquery/3.6.1/jquery.min.js"></script>
<script src="https://cdn.staticfile.org/socket.io/4.5.2/socket.io.min.js"></script>
</head>
<body>
<h1>Flask-SocketIO Quickstart</h1>
<h2 id="sid">客户端</h2>
<h2>发消息</h2>
<input id="emit_data" value="Hello World!">
<button id="emit">发消息</button>
<h2>收消息</h2>
<div id="log"></div>
<script>
var socket = io();
// var socket = io("ws://127.0.0.1:5000");
socket.on("connect", function () {
$("#sid").text("客户端:" + socket.id)
});
$("#emit").click(function () {
socket.emit("message", {data: $("#emit_data").val()});
}); // 点击按钮发消息
socket.on("my_response", function (msg) {
$("#log").append("<p>" + msg.data + "</p>"); // 收消息
});
</script>
</body>
</html>
效果

更多内容查阅官方示例
WebSocekt 是 HTML5 规范的一部分,是一种应用层协议,借鉴了 socket 思想,为客户端和服务端之间提供了双向通信功能,包含一套标准的 API。
Socket.IO 是一个 JavaScript 库,不仅支持 WebSocket,还支持许多种轮询机制,当 Socket.IO 检测到当前环境不支持 WebSocket 时,能自动选择最佳方式实现网络实时通信。
后端 Flask-Sockets 对应前端使用原生 WebSocekt
后端 Flask-SocketIO 对应前端使用 Socket.IO(推荐这种)
大约一年前,我决定确保每个包含非唯一文本的Flash通知都将从模块中的方法中获取文本。我这样做的最初原因是为了避免一遍又一遍地输入相同的字符串。如果我想更改措辞,我可以在一个地方轻松完成,而且一遍又一遍地重复同一件事而出现拼写错误的可能性也会降低。我最终得到的是这样的:moduleMessagesdefformat_error_messages(errors)errors.map{|attribute,message|"Error:#{attribute.to_s.titleize}#{message}."}enddeferror_message_could_not_find(obje
我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i
在应用开发中,有时候我们需要获取系统的设备信息,用于数据上报和行为分析。那在鸿蒙系统中,我们应该怎么去获取设备的系统信息呢,比如说获取手机的系统版本号、手机的制造商、手机型号等数据。1、获取方式这里分为两种情况,一种是设备信息的获取,一种是系统信息的获取。1.1、获取设备信息获取设备信息,鸿蒙的SDK包为我们提供了DeviceInfo类,通过该类的一些静态方法,可以获取设备信息,DeviceInfo类的包路径为:ohos.system.DeviceInfo.具体的方法如下:ModifierandTypeMethodDescriptionstatic StringgetAbiList()Obt
我遇到了这个奇怪的错误.../Users/gideon/Documents/ca_ruby/rubytactoe/lib/player.rb:13:in`gets':Isadirectory-spec(Errno::EISDIR)player_spec.rb:require_relative'../spec_helper'#theuniverseisvastandinfinite...itcontainsagame....butnoplayersdescribe"tictactoegame"docontext"theplayerclass"doit"musthaveahumanplay
我有一个super简单的脚本,它几乎包含了FayeWebSocketGitHub页面上用于处理关闭连接的内容:ws=Faye::WebSocket::Client.new(url,nil,:headers=>headers)ws.on:opendo|event|p[:open]#sendpingcommand#sendtestcommand#ws.send({command:'test'}.to_json)endws.on:messagedo|event|#hereistheentrypointfordatacomingfromtheserver.pJSON.parse(event.d
我有一个数组数组,想将元素附加到子数组。+=做我想做的,但我想了解为什么push不做。我期望的行为(并与+=一起工作):b=Array.new(3,[])b[0]+=["apple"]b[1]+=["orange"]b[2]+=["frog"]b=>[["苹果"],["橙子"],["Frog"]]通过推送,我将推送的元素附加到每个子数组(为什么?):a=Array.new(3,[])a[0].push("apple")a[1].push("orange")a[2].push("frog")a=>[[“苹果”、“橙子”、“Frog”]、[“苹果”、“橙子”、“Frog”]、[“苹果”、“
我有两个文本文件,master.txt和926.txt。如果926.txt中有一行不在master.txt中,我想写入一个新文件notinbook.txt。我写了我能想到的最好的东西,但考虑到我是一个糟糕的/新手程序员,它失败了。这是我的东西g=File.new("notinbook.txt","w")File.open("926.txt","r")do|f|while(line=f.gets)x=line.chompifFile.open("master.txt","w")do|h|endwhile(line=h.gets)ifline.chomp!=xputslineendende
我使用raise(ConfigurationError.new(msg))引发错误我试着用rspec测试一下:expect{Base.configuration.username}.toraise_error(ConfigurationError,message)但这行不通。我该如何测试呢?目标是匹配message。 最佳答案 您可以使用正则表达式匹配错误消息:it{expect{Foo.bar}.toraise_error(NoMethodError,/private/)}这将检查NoMethodError是否由privateme
我最近一直在查看一些gem的源代码。我经常看到的一个习惯用法是使用嵌套模块,其中包含连接到版本字符串中的版本常量,即围绕此类事物的变体:moduleChunkyBaconmoduleVersionMAJOR=0MINOR=6TINY=2endVERSION=[Version::MAJOR,Version::MINOR,Version::TINY].compact*'.'end以这种方式存储库版本信息有什么好处(如果有的话)?为什么不这样做:moduleChunkyBaconVERSION='0.6.2'.freezeend 最佳答案
如何使用如下两个数组构建一个数组:名称=[a,b,c]how_many_of_each[3,5,2]得到my_array=[a,a,a,b,b,b,b,b,c,c] 最佳答案 使用zip、flat_map和数组乘法:irb(main):001:0>value=[:a,:b,:c]=>[:a,:b,:c]irb(main):002:0>times=[3,5,2]=>[3,5,2]irb(main):003:0>value.zip(times).flat_map{|v,t|[v]*t}=>[:a,:a,:a,:b,:b,:b,:b,:b