服务端:
from fastapi import FastAPI from fastapi.responses import StreamingResponse from fastapi.middleware.cors import CORSMiddleware import timeapp = FastAPI()# 允许所有来源的跨域请求 app.add_middleware(CORSMiddleware,allow_origins=["*"], # 允许所有来源allow_credentials=True,allow_methods=["*"], # 允许所有HTTP方法allow_headers=["*"] # 允许所有请求头 )def generate_stream():for i in range(1, 11):yield f"data: Message {i}\n\n"time.sleep(1)@app.get("/stream") async def stream():return StreamingResponse(generate_stream(), media_type="text/event-stream")#uvicorn sse_server:app --port 1090 --reload
客户端:
import requests from requests.auth import HTTPBasicAuth# 定义事件流客户端类 class EventStreamClient:def __init__(self, url, username=None, password=None):self.url = urlself.session = requests.Session()if username and password:self.session.auth = HTTPBasicAuth(username, password)def connect(self):# 使用requests.Session来保持连接self.response = self.session.get(self.url, stream=True)self.response.raise_for_status() # 检查是否连接成功def events(self):# 生成器:逐行读取响应内容并逐一生成事件for line in self.response.iter_lines():if line:yield line.decode('utf-8').rstrip() # 去掉行尾的换行符def close(self):# 关闭会话 self.session.close()# 使用示例 url = 'http://127.0.0.1:1090/stream' # 替换为实际的事件流URL client = EventStreamClient(url, username='', password='') try:client.connect()for event in client.events():print(event) # 处理接收到的每个事件 finally:client.close()
输出: