1.安装workman扩展
composer require workerman/workerman
2.安装think-worker
composer require topthink/think-worker=1.0.*
3.如果在windows下使用,还需下载
composer require workerman/workerman-for-win
4.根目录创建server.php 。用来启动服务
#!/usr/bin/env php <?php define('APP_PATH', __DIR__ . '/application/'); define('BIND_MODULE','push/Worker'); // 加载框架引导文件 require __DIR__ . '/thinkphp/start.php';
5.新建Worker.php 服务器要记得开发端口和换域名;127.0.0.1换为0.0.0.0

1.安装workman扩展composer require workerman/workerman 2.安装think-workercomposer require topthink/think-worker=1.0.* 3.如果在windows下使用,还需下载 composer require workerman/workerman-for-win 4.根目录创建server.php 。用来启动服务 #!/usr/bin/env php <?php define('APP_PATH', __DIR__ . '/application/'); define('BIND_MODULE','push/Worker'); // 加载框架引导文件 require __DIR__ . '/thinkphp/start.php'; 5.新建Worker.php 服务器要记得开发端口和换域名;127.0.0.1换为0.0.0.0<?phpnamespace app\push\controller;use think\worker\Server; use Workerman\Lib\Timer; use think\Log; class Worker extends Server {protected $socket = 'websocket://127.0.0.1:2346';protected $uidConnections = [];protected $HEARTBEAT_TIME = '300';public function _initialize(){Log::init(['type' => 'File','apart_level' => ['API'], 'path' => '../runtime/log/']);}/*** 收到信息* @param $connection* @param $data*/public function onMessage($connection, $data){// 给connection临时设置一个lastMessageTime属性,用来记录上次收到消息的时间$connection->lastMessageTime = time();$data = json_decode($data,false);//接收到消息进行逻辑处理 修改状态switch($data->type){case 'login':// 保存该用户的输送数据$this->uidConnections[$data->uid] = $connection;$connection->send('所有用户等会收到的信息');$this->send_uid($data->uid,"通过保存uid:{$data->uid}为给你发的信息");break;case 'ping':// 心跳$connection->send('pong');break;case 'sendMessage':// 发送所有消息$this->send_all($data->message,$data->uid);break;case 'sendUser' :// 发送单个消息$this->send_uid($data->uid,$data->message);break;}}/*** 当连接建立时触发的回调函数* @param $connection*/public function onConnect($connection){$connection->send('已建立连接');}/*** 当连接断开时触发的回调函数* @param $connection*/public function onClose($connection){$connection->send('连接断开');}/*** 当客户端的连接上发生错误时触发* @param $connection* @param $code* @param $msg*/public function onError($connection, $code, $msg){echo "error $code $msg\n";}/*** 每个进程启动* @param $worker*/public function onWorkerStart($worker){// 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符$inner_text_worker = new \Workerman\Worker('Text://127.0.0.1:5678');$inner_text_worker->onMessage = function ($connection, $buffer){$buffer = json_decode($buffer,false);switch($buffer->type){case 'sendMessage' :$res = $this->send_all($buffer->message);break;case 'sendUser' :$res = $this->send_uid($buffer->uid,$buffer->message);break;default:$res = $this->send_all('1111');}$connection->send($res ? 'ok' : 'fail');};$inner_text_worker->listen();Timer::add($this->HEARTBEAT_TIME, function()use($worker){$time_now = time();foreach($worker->connections as $connection) {// 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间if (empty($connection->lastMessageTime)) {$connection->lastMessageTime = $time_now;continue;}$diff_time = $time_now - $connection->lastMessageTime;$msg = '距离上次通话已经过去'.$diff_time.'秒';$connection->send($msg);// 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接if ($time_now - $connection->lastMessageTime > $this->HEARTBEAT_TIME) {$connection->close();}}});}public function send_uid($uid,$message){if(isset($this->uidConnections[$uid])){// 获取之前用户的链接$conn = $this->uidConnections[$uid];//echo $uid.PHP_EOL;$conn->send($message);Log::write("消息发送给 UID $uid: $message", 'API'); // 添加调试日志return true;}Log::write("UID $uid 不存在", 'API'); // 添加调试日志return false;}public function send_all($message,$uid = ''){foreach($this->uidConnections as $conn){if(!empty($uid)){//不推送给自己if($conn != $this->uidConnections[$uid]){$conn->send($message);}}else{$conn->send($message);}}return true;} }
6.根目录下执行命令,运行服务
php server.php start
7.新加test.html 用来接受测试,记得将127.0.0.1换为域名

<!DOCTYPE html> <html lang="en"> <head><meta charset="UTF-8"><title>Title</title><script>// 创建一个Socket实例var socket = new WebSocket('ws://127.0.0.1:2346');var uid = "33";//心跳间隔var heartbeatInterval = 290 * 1000;// 打开Socketsocket.onopen = function(event) {// 发送一个初始化消息 socket.send(JSON.stringify({type: "login",uid: uid,}));// 启动心跳定时器 startHeartbeat();};socket.onmessage = function(event) {console.log('收到消息',event.data);// 处理收到的心跳回复if (event.data === 'pong') {console.log('心跳回复');}};// 监听Socket的关闭socket.onclose = function(event) {console.log('关闭监听',event);// 连接关闭后,清除心跳定时器 stopHeartbeat();};// 心跳定时器var heartbeatTimer;function startHeartbeat() {heartbeatTimer = setInterval(function() {socket.send(JSON.stringify({ type: 'ping' }));console.log('发送心跳');}, heartbeatInterval);}function stopHeartbeat() {if (heartbeatTimer) {clearInterval(heartbeatTimer);heartbeatTimer = null;}}function send(){var val = $("#msg").val();if(val==''){alert('请输入发送内容');return false;}socket.send(JSON.stringify({type: "sendMessage",uid: uid,message:val}));}</script> </head> <body> <input type="text" id="msg"> <button onclick="send()">发送消息</button> </body> <script src="https://code.jquery.com/jquery-3.1.1.min.js"></script> </html>
8.手动推送消息

public static function push_msg($str = '',$uid = '33' ){// $uid = input('uid','33');$type = input('type','sendUser');$msg = input('msg','hello word');$client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);if (!$client) {echo "Error: $errmsg\n";return false;}// 推送的数据,包含用户,表示是给这个用户推送//有数据则进行推送if($str){$data = array('uid'=>$uid, 'message'=>$str,'type'=>$type);// 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符$bytesWritten = fwrite($client, json_encode($data) . "\n");if ($bytesWritten === false) {Log::write('Failed to write data to the socket', 'API');fclose($client);return false;}stream_set_timeout($client, 1);// 读取推送结果$response = fread($client, 8192);fclose($client);return $response;}}
结果:
原文地址:https://blog.csdn.net/qq_43630915/article/details/140843610