问题描述
客户要求我们开发一个后台实时定位系统,该系统能够实时监控客户系统里面会员所在的位置,并将会员的位置信息显示在地图上。服务器后端是PHP开发的后台,主要是讲究效率。会员展示的前端是一个微信小程序,那么,前端可通过微信小程序提供的wx.startLocationUpdateBackground、wx.onLocationChange来实时获取当前的位置信息。
具体构思如下:
- 在会员进入相应的小程序页面时,调用wx.startLocationUpdateBackground方法,开启后台定位功能。
- 当会员的位置发生变化时,会触发wx.onLocationChange事件,我们可以获取到当前的位置信息,并使用地图API将会员的位置提交到API服务。
- API服务接收到会员的位置信息后,将会员的位置信息使用redis.publish方法发布到指定的频道,供后台实时定位系统接收。
- 使用docker服务部署一个python服务,该服务监听redis的指定频道,并接收到会员的位置信息后,判断是否有操作员在监听该会员的位置信息,如果有,则将会员的位置信息发送给操作员。
- 操作员客户端收到会员的位置信息后,将会员的位置信息显示在地图上。
下面是具体的实现步骤:
- 准备工作:申请微信小程序的appid和secret,并在微信小程序后台配置好后台定位功能。其实在测试阶段,这些wx.startLocationUpdateBackground、wx.onLocationChange在体验版中都是开放的,但在正式版中需要申请权限。我们可以先在体验版中测试后台定位功能是否正常工作。
- 小程序端:在小程序端,我们需要调用wx.startLocationUpdateBackground方法开启后台定位功能,并在wx.onLocationChange事件中获取当前的位置信息。
methods: {locationListener() {uni.startLocationUpdateBackground({success: () => {uni.onLocationChange((res) => {this.$api.sendRequest({url: '/api/location/move',data: res})});},fail: err => {console.log('监听位置失败 -> ', err);}})},locationStart() {uni.getSetting({success: (res) => {if (res.authSetting['scope.userLocationBackground']) {this.locationListener();} else {uni.authorize({scope: 'scope.userLocationBackground',success: () => {this.locationListener();},fail: () => {uni.showModal({title: '提示',content: '授权失败,点击右上角设置位置为使用时和离开后!',success: r => {if (r.confirm) {uni.openSetting({complete: t => {if (t.authSetting['scope.userLocationBackground']) {this.locationListener();} else {console.log('授权失败 ->', t);}}})}}})}})}}})}
}
- 修改mainifest.json文件,添加后台定位权限
"mp-weixin" : {"permissions": {"scope.userLocationBackground" : {"desc" : "为了给您提供更精准、便捷的服务,我们需要获取您的后台持续获取位置信息。我们承诺会严格保护您的隐私,仅在实现上述功能时使用您的位置信息,感谢您的理解与支持!"},"scope.userLocation" : {"desc" : "为了为您提供更好更精准的服务,需要获得您当前位置信息。"}},"requiredPrivateInfos" : ["onLocationChange","startLocationUpdateBackground"],"requiredBackgroundModes" : [ "location" ],
}
- PHP服务端
Cache::store('redis')->publish("location_update", json_encode(['uid' => $uid,'longitude' => input('longitude', ''),'latitude' => input('latitude', ''),
]));
- Python服务端 (这里需要注意:访问的redis服务端是宿主机的redis,所以需要将redis的host设置为宿主机的IP地址,否则会出现连接失败的情况,因为服务器的python环境是python2。如果服务器的python本身就是python3,不需要构建docker镜像,直接运行即可。)
#!/usr/bin/env python3
import asyncio
import json
import logging
from redis.asyncio import Redis
from redis.exceptions import RedisError
from websockets.server import serve, WebSocketServerProtocol
from typing import Set, Dict
from datetime import datetime# 设置更详细的日志格式
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)class LocationServer:def __init__(self):self.connections: Dict[str, Set[WebSocketServerProtocol]] = {}self.redis = Noneself.pubsub = Noneself.redis_config = {'host': '127.0.0.1','port': 6379,'password': None,'db': 0,'decode_responses': True,'retry_on_timeout': True,'socket_keepalive': True}def log_connections_status(self):"""记录当前连接状态"""total_connections = sum(len(clients) for clients in self.connections.values())logger.info(f"Current connections status:")logger.info(f"Total connected clients: {total_connections}")for uid, clients in self.connections.items():logger.info(f"Member {uid}: {len(clients)} client(s)")async def init_redis(self):max_retries = 5retry_count = 0while retry_count < max_retries:try:if self.redis:await self.redis.close()self.redis = Redis(**self.redis_config)self.pubsub = self.redis.pubsub()await self.redis.ping()logger.info("Redis connection established")return Trueexcept Exception as e:retry_count += 1logger.error(f"Redis connection attempt {retry_count} failed: {e}")if retry_count < max_retries:await asyncio.sleep(5)else:logger.error("Max retries reached, could not connect to Redis")return Falseasync def redis_subscriber(self):while True:try:if self.redis is None or not await self.redis.ping():logger.info("Trying to connect to Redis...")if not await self.init_redis():await asyncio.sleep(5)continueawait self.pubsub.subscribe('location_update')logger.info("Successfully subscribed to location_update channel")while True:message = await self.pubsub.get_message(ignore_subscribe_messages=True)if message and message['type'] == 'message':try:data = json.loads(message['data'])uid = str(data.get('uid'))logger.info(f"Received location update for member {uid}")if uid in self.connections and self.connections[uid]:client_count = len(self.connections[uid])logger.info(f"Found {client_count} client(s) for member {uid}")disconnected = set()success_count = 0for websocket in self.connections[uid]:try:await websocket.send(json.dumps(data))success_count += 1logger.info(f"Successfully sent location update to a client for member {uid}")except Exception as e:logger.error(f"Failed to send to websocket: {e}")disconnected.add(websocket)for ws in disconnected:self.connections[uid].remove(ws)logger.info(f"Removed disconnected client for member {uid}")if not self.connections[uid]:del self.connections[uid]logger.info(f"Removed member {uid} as no clients remaining")logger.info(f"Location update sent to {success_count}/{client_count} clients for member {uid}")else:logger.info(f"No clients found for member {uid}")except json.JSONDecodeError as e:logger.error(f"Invalid JSON message: {e}")await asyncio.sleep(0.1)except RedisError as e:logger.error(f"Redis error: {e}")await asyncio.sleep(5)except Exception as e:logger.error(f"Unexpected error: {e}")await asyncio.sleep(5)async def register(self, websocket: WebSocketServerProtocol, uid: str):"""注册新的WebSocket连接"""if uid not in self.connections:self.connections[uid] = set()self.connections[uid].add(websocket)logger.info(f"New connection registered for uid: {uid}")self.log_connections_status()async def unregister(self, websocket: WebSocketServerProtocol, uid: str):"""注销WebSocket连接"""if uid in self.connections:self.connections[uid].remove(websocket)if not self.connections[uid]:del self.connections[uid]logger.info(f"Connection unregistered for uid: {uid}")self.log_connections_status()async def handler(self, websocket: WebSocketServerProtocol):"""处理WebSocket连接"""client_address = websocket.remote_addresslogger.info(f"New WebSocket connection from {client_address}")uid = Nonetry:message = await websocket.recv()logger.info(f"Received initial message from {client_address}: {message}")data = json.loads(message)uid = str(data.get('uid'))if not uid:logger.warning(f"No uid provided from {client_address}")await websocket.close(1002, "uid required")returnawait self.register(websocket, uid)logger.info(f"Client {client_address} registered for member {uid}")try:async for message in websocket:logger.debug(f"Received message from {client_address}: {message}")except Exception as e:logger.error(f"Error in websocket message loop for {client_address}: {e}")finally:if uid:await self.unregister(websocket, uid)logger.info(f"Client {client_address} unregistered for member {uid}")except Exception as e:logger.error(f"WebSocket handler error for {client_address}: {e}")if uid:await self.unregister(websocket, uid)async def main():server = LocationServer()logger.info("Starting Location WebSocket Server...")subscriber_task = asyncio.create_task(server.redis_subscriber())logger.info("Redis subscriber task created")async with serve(server.handler, "0.0.0.0", 30582):logger.info("WebSocket server is running on ws://0.0.0.0:30582")try:await asyncio.Future()except Exception as e:logger.error(f"Server error: {e}")finally:subscriber_task.cancel()logger.info("Server shutting down...")if __name__ == "__main__":asyncio.run(main())
- 操作员客户端
const ws = new WebSocket('ws://demo.com:30582');ws.onopen = () => {console.log('WebSocket 已连接');
}; ws.onmessage = (event) => { const data = JSON.parse(event.data);// 刷新地图显示当前位置consol.log("当前位置发生变化 ->" + data);// 这里可以进行地图画线、标注等操作,每个人使用的地图不一样,需要将经纬度转换为自己使用的坐标系
};ws.onclose = (event) => {console.log('WebSocket 已关闭');
};ws.onerror = (event) => {console.log('WebSocket 发生错误');
};
总结
通过以上步骤,我们可以开发出一个后台实时定位系统,该系统能够实时监控客户系统里面会员所在的位置,并将会员的位置信息显示在地图上。服务器后端是PHP开发的后台,最终实现成功也就那么一会,不需要太多的技术难度,比如安装各种库、各种配置、各种环境,仅需要在python运行前运行 pip install websockets redis 即可。调试时,有错误直接修改代码即可,不需要重启服务。