为了使用Python实现WebSocket通讯和心跳控制,我们通常需要一个WebSocket客户端库和一个服务器端库。这里,我们将使用websockets库作为服务器和客户端的示例。
安装必要的库
首先,你需要安装websockets库。可以使用pip进行安装:
pip install websockets
1.服务器端代码
服务器端将处理WebSocket连接,发送和接收消息,并管理心跳检测。
import asyncio
import websockets
import time HEARTBEAT_INTERVAL = 10 # 心跳间隔(秒)
HEARTBEAT_TIMEOUT = 30 # 心跳超时(秒) # 心跳检测字典,存储客户端和它们最后活动时间
clients = {} async def heartbeat_task(ws, client_id): while True: try: # 如果客户端在HEARTBEAT_TIMEOUT内没有发送消息,则关闭连接 if time.time() - clients[client_id]['last_active'] > HEARTBEAT_TIMEOUT: print(f"Client {client_id} timed out, closing connection.") await ws.close() break # 发送心跳消息 await ws.send("heartbeat") print(f"Sent heartbeat to client {client_id}") await asyncio.sleep(HEARTBEAT_INTERVAL) except Exception as e: print(f"Heartbeat task for client {client_id} failed: {e}") break async def echo(websocket, path): client_id = id(websocket) # 使用内存地址作为简单的客户端ID clients[client_id] = {'ws': websocket, 'last_active': time.time()} # 启动心跳检测任务 asyncio.create_task(heartbeat_task(websocket, client_id)) try: async for message in websocket: clients[client_id]['last_active'] = time.time() # 更新最后活动时间 if message == "ping": print(f"Received ping from client {client_id}") await websocket.send("pong") else: print(f"Received '{message}' from client {client_id}") await websocket.send(f"Echo: {message}") except websockets.exceptions.ConnectionClosed: print(f"Client {client_id} disconnected") finally: # 清理客户端信息 del clients[client_id] start_server = websockets.serve(echo, "localhost", 8765) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()
2.客户端代码
客户端将连接到服务器,接收和发送消息,并响应心跳消息。
import asyncio
import websockets
import time async def client(): uri = "ws://localhost:8765" async with websockets.connect(uri) as websocket: while True: try: message = input("Enter message to send (or 'exit' to quit): ") if message == 'exit': break await websocket.send(message) response = await websocket.recv() print(f"Received: {response}") # 发送pong消息以响应心跳消息 if response == "heartbeat": await websocket.send("pong") # 模拟客户端工作,防止心跳超时 await asyncio.sleep(5) except websockets.exceptions.ConnectionClosed: print("Connection closed by server.") break asyncio.get_event_loop().run_until_complete(client())
3.运行和测试
首先运行服务器端代码。
然后运行客户端代码,并在提示时输入消息。
观察服务器和客户端的输出,确保它们能够正常通讯,并且心跳控制按预期工作。
注意:心跳检测的实现是基于一个简单的字典和内存地址作为客户端ID。在实际应用中,你可能需要使用更复杂的机制来跟踪客户端,如使用数据库或分布式缓存。此外,为了简化示例,心跳消息只是简单地发送字符串"heartbeat",并且客户端只是通过发送"pong"来响应。在真实场景中,你可能需要实现更复杂的逻辑来处理心跳。
关注公众号了解更多内容