Python Tornado 实现SSE服务端主动推送方案

一、SSE 服务端消息推送

SSEServer-Sent Events 的简称, 是一种服务器端到客户端(浏览器)的单项消息推送。对应的浏览器端实现 Event Source 接口被制定为HTML5 的一部分。相比于 WebSocket,服务器端和客户端工作量都要小很多、简单很多,而 Tornado 又是Python中的一款优秀的高性能web框架,本文带领大家一起实践下 Tornado SSE 的实现。

本文主要探索两个方面的实践:一个是客户端发送请求,服务端的返回是分多次进行传输的,直到传输完成,这种情况下请求结束后,就可以考虑关闭 SSE了,所以这种连接可以认为是暂时的。另一种是由服务端在特定的时机下主动推送消息给到客户端,推送的时机具有不确定性,随时性,所以这种情况下需要客户端和服务端保持长久连接。

本次使用的 Tornado 版本:

tornado==6.3.2

二、短暂性场景下的 SSE 实现

短暂性场景下就是对应上面的第一点,客户端主动发送请求后,服务端分多次传输,直到完成,数据获取完成后连接就可以断开了,适用于一些接口复杂,操作步骤多的场景,可以提前告诉客户端现在进行到了哪一步了,并且这种方式也有利于服务端的横向扩展。

Tornado 中实现,需要注意的是要关闭 _auto_finish ,这样的话就不会被框架自己主动停止连接了,下面是一个实现的案例:

import time
from tornado.concurrent import run_on_executor
from tornado.web import RequestHandler
import tornado.gen
from concurrent.futures.thread import ThreadPoolExecutorclass SSE(RequestHandler):def initialize(self):# 关闭自动结束self._auto_finish = Falseprint("initialize")def set_default_headers(self):# 设置为事件驱动模式self.set_header('Content-Type', "text/event-stream")# 不使用缓存self.set_header('Content-Control', "no-cache")# 保持长连接self.set_header('Connection', "keep-alive")# 允许跨域self.set_header('Access-Control-Allow-Origin', "*")def prepare(self):# 准备线程池self.executor = self.application.pool@tornado.gen.coroutinedef get(self):result = yield self.doHandle()self.write(result)# 结束self.finish()@run_on_executordef doHandle(self):tornado.ioloop.IOLoop.current()# 分十次推送信息for i in range(10):time.sleep(1)self.flush()self.callback(f"current: {i}")return f"data: end\n\n"def callback(self, message):# 事件推送message = f"data: {message}\n\n"self.write(message)self.flush()class Application(tornado.web.Application):def __init__(self):handlers = [("/sse", SSE),("/(.*)$", tornado.web.StaticFileHandler, {"path": "resources/static","default_filename": "index.html"})]super(Application, self).__init__(handlers)self.pool = ThreadPoolExecutor(200)def startServer(port):app = Application()httpserver = tornado.httpserver.HTTPServer(app)httpserver.listen(port)print(f"Start server success", f"The prot = {port}")tornado.ioloop.IOLoop.current().start()if __name__ == '__main__':startServer(8020)

运行后可以到浏览器访问:http://localhost:8020/sse,此时就可以看到服务端在不断地推送数据过来了:

在这里插入图片描述

那如何在前端用 JS 获取数据呢,前面提到在 JS 层面,有封装好的 Event Source 组件可以直接拿来使用,例如:

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>测试服务器推送技术</title>
</head>
<body><div id="messages"></div>
</body>
<script>const eventSource = new EventSource('http://localhost:8020/sse');// 事件回调eventSource.onmessage = (event) => {console.log(event.data)const messagesDiv = document.getElementById('messages');messagesDiv.innerHTML += '<p>' + event.data + '</p>';};// 异常eventSource.onerror = (error) => {console.error('EventSource failed:', error);eventSource.close();};eventSource.onopen = ()=>{console.log("开启")}</script>
</html>

运行后可以看到服务端分阶段推送过来的数据:

在这里插入图片描述

三、长连接场景下的 SSE 实现

上面实现了客户端请求后,分批次返回,但是有些情况下是客户端连接后没有东西返回,而是在某个特定的时机下返回给某几个客户端,所以这种情况,我们需要和客户端保持长久的连接,同时进行客户端连接的缓存,因为同时有可能有 100 个用户,但是推送时可能只需要给 10 个用户推送,这种方式相当于将一个客户端和一个服务端进行了绑定,一定程度上不利于服务端的横向扩展,但也可以通过一些消息订阅的方式解决类似问题。

下面是一个实现案例:

import time
from tornado.concurrent import run_on_executor
from tornado.web import RequestHandler
import tornado.gen
from concurrent.futures.thread import ThreadPoolExecutor# 单例
def singleton(cls):instances = {}def wrapper(*args, **kwargs):if cls not in instances:instances[cls] = cls(*args, **kwargs)return instances[cls]return wrapper# 订阅推送工具类
@singleton
class Pusher():def __init__(self):self.clients = {}def add_client(self, client_id, callback):if client_id not in self.clients:self.clients[client_id] = callbackprint(f"{client_id} 连接")def send_all(self, message):for client_id in self.clients:callback = self.clients[client_id]print("发送消息给:", client_id)callback(message)def send(self, client_id, message):callback = self.clients[client_id]print("发送消息给:", client_id)callback(message)class SSE(RequestHandler):# 定义推送者pusher = Pusher()def initialize(self):# 关闭自动结束self._auto_finish = Falseprint("initialize")def set_default_headers(self):# 设置为事件驱动模式self.set_header('Content-Type', "text/event-stream")# 不使用缓存self.set_header('Content-Control', "no-cache")# 保持长连接self.set_header('Connection', "keep-alive")# 允许跨域self.set_header('Access-Control-Allow-Origin', "*")@tornado.gen.coroutinedef get(self):# 客户端唯一标识client_id = self.get_argument("client_id")self.pusher.add_client(client_id, self.callback)def callback(self, message):# 事件推送message = f"data: {message}\n\n"self.write(message)self.flush()# 定义推送接口,模拟推送
class Push(RequestHandler):# 定义推送者pusher = Pusher()def prepare(self):# 准备线程池self.executor = self.application.pool@tornado.gen.coroutinedef get(self):# 客户端标识client_id = self.get_argument("client_id")# 推送的消息message = self.get_argument("message")result = yield self.doHandle(client_id, message)self.write(result)@run_on_executordef doHandle(self, client_id, message):tornado.ioloop.IOLoop.current()self.pusher.send(client_id, message)return "success"class Application(tornado.web.Application):def __init__(self):handlers = [("/sse", SSE),("/push", Push),("/(.*)$", tornado.web.StaticFileHandler, {"path": "resources/static","default_filename": "index.html"})]super(Application, self).__init__(handlers)self.pool = ThreadPoolExecutor(200)def startServer(port):app = Application()httpserver = tornado.httpserver.HTTPServer(app)httpserver.listen(port)print(f"Start server success", f"The prot = {port}")tornado.ioloop.IOLoop.current().start()if __name__ == '__main__':startServer(8020)

这里我定义了一个 Pusher 订阅推送工具类,用来存储客户端的连接,以及给指定客户端或全部客户端发送消息,然后我又定义 Push 接口,模拟不定时的指定客户端发送信息的场景。

同样前端也要修改,需要给自己定义 client_id ,例如:

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>测试服务器推送技术</title>
</head>
<body><div id="client"></div><div id="messages"></div>
</body>
<script>function generateUUID() {let uuid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {const r = Math.random() * 16 | 0;const v = c === 'x' ? r : (r & 0x3 | 0x8);return v.toString(16);});return uuid;}// 利用uuid 模拟生成唯一的客户端IDlet client_id = generateUUID();document.getElementById('client').innerHTML = "当前 client_id = "+client_id;const eventSource = new EventSource('http://localhost:8020/sse?client_id='+client_id);// 事件回调eventSource.onmessage = (event) => {console.log(event.data)const messagesDiv = document.getElementById('messages');messagesDiv.innerHTML += '<p>' + event.data + '</p>';};// 异常eventSource.onerror = (error) => {console.error('EventSource failed:', error);eventSource.close();};eventSource.onopen = ()=>{console.log("开启")}</script>
</html>

这里我用 uuid 模拟客户端的唯一ID,在真实使用时可不要这么做。

下面使用浏览器打开三个页面,可以看到三个不同的 client_id :

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
在服务端的日志中也能看到这三个客户端的连接:

在这里插入图片描述

下面调用 push 接口来给任意一个客户端发送消息,例如这里发给client_id = 2493045e-84dd-4118-8d96-0735c4ac186b 的用户 :

在这里插入图片描述

下面看到 client_id2493045e-84dd-4118-8d96-0735c4ac186b的页面:

在这里插入图片描述
已经成功收到推送的消息,反之看另外两个:

在这里插入图片描述
在这里插入图片描述
都没有消息,到这里就实现了长连接下不定时的服务端消息推送方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/422474.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【网络安全 -> 防御与保护】专栏文章索引

为了方便 快速定位 和 便于文章间的相互引用等 作为一个快速准确的导航工具 网络安全——防御与保护 &#xff08;一&#xff09;.信息安全概述

VSCode插件 —— Cody AI (免费AI助手!)

之前介绍过一款 阿里云免费的AI开发工具——通义灵码 TONGYI Lingma 本文再推荐一个可以极大提高开发前端开发效率的工具 —— Cody AI &#xff08;Sourcegraph&#xff09;&#xff0c;同样是免费的&#xff01; 不过&#xff0c;使用Cody AI需要有github 或 Google 、 git…

xshell配置隧道转移规则

钢铁知识库&#xff0c;一个学习python爬虫、数据分析的知识库。人生苦短&#xff0c;快用python。 xshell是什么 通俗点说就是一款强大ssh远程软件&#xff0c;可以方便运维人员对服务器进行管理操作&#xff0c;功能很多朋友们自行探索&#xff0c;今天只聊其中一个功能点那…

CentOs7 安装Mysql(5.7和8.0版本)密码修改跳过 超详细教程

CSDN 成就一亿技术人&#xff01; 今天出一期Centos下安装Mysql&#xff08;详细教程&#xff09;包括数据库密码跳过修改 CSDN 成就一亿技术人&#xff01; 目录 1.获取安装包 2.安装程序 安装下载的rpm包 查看安装包 修改5.7版本&#xff08;重要&#xff09; 安装M…

Vue3+ElementUI 多选框中复选框和名字点击方法效果分离

现在的需求为 比如我点击了Option A &#xff0c;触发点击Option A的方法&#xff0c;并且复选框不会取消勾选&#xff0c;分离的方法。 <el-checkbox-group v-model"mapWork.model_checkArray.value"> <div class"naipTypeDom" v-for"item …

常用芯片学习——HC245芯片

HC245三态输出八路总线收发器 使用说明 这些八路总线收发器专为数据总线之间的异步双向通信而设计。控制功能实现可更大限度地减少外部时序要求。根据方向控制 (DIR) 输入上的逻辑电平&#xff0c;此类器件将数据从 A 总线发送至 B 总线&#xff0c;或者将数据从 B 总线发送至…

UCIE协议介绍--芯粒间互联标准

UCIE协议介绍--芯粒间互联标准 1 背景2 UCIE协议介绍2.1 协议层2.2 适配层2.3 物理层2.4 D2D接口 3 Transmission3.1 SideBand数据包3.2 SideBand包格式3.2.1 MRd/Mwr/CfgRd/CfgWr3.2.2 Completion3.2.3 Message 3.3 FDI接口信号 4 链路训练4.1 PHY LSM状态介绍 1 背景 为什么…

Spring boot项目java bean和xml互转

Spring boot项目实现java bean和xml互转 项目场景&#xff1a;互转方法使用jackson进行互转使用jaxws进行xml与bean的互转 搞定收工&#xff01; 项目场景&#xff1a; 工作中需要给下游第三方收费系统做数据挡板&#xff0c;由于下游系统使用的是soap webservice,里面涉及各种…

【开源】基于JAVA语言的陕西非物质文化遗产网站

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 设计目标2.2 研究内容2.3 研究方法与过程2.3.1 系统设计2.3.2 查阅文献2.3.3 网站分析2.3.4 网站设计2.3.5 网站实现2.3.6 系统测试与效果分析 三、系统展示四、核心代码4.1 查询民间文学4.2 查询传统音乐4.3 增改传统舞…

基于springboot+vue的网上点餐系统(前后端分离)

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战 主要内容&#xff1a;毕业设计(Javaweb项目|小程序等)、简历模板、学习资料、面试题库、技术咨询 文末联系获取 背景和意…

大数据技术原理及应用课实验6 :熟悉Hive的基本操作

目录 一、实验目的 二、实验平台 三、数据集 四、实验步骤&#xff08;每个步骤下均需有运行截图&#xff09; &#xff08;1&#xff09;创建一个内部表stocks&#xff0c;字段分隔符为英文逗号&#xff0c;表结构如表14-11所示。 (2)创建一个外部分区表dividends&#x…

Leetcode刷题笔记题解(C++):670. 最大交换

思路&#xff1a; 假设数字 9923676 从右边找最大的数字的下标maxindex&#xff0c;然后向左边寻找小于最大数字的数的下标&#xff0c;直到找到最左边&#xff0c;交换两者得出新的数字&#xff0c;比如从左到右递减的数字如9621则不需要变化&#xff0c;在寻找中记录这种数…