argument('op'); $op = $op ? $op : 'start'; if ($op == 'start') { echo "swoole socket service start ...\n"; $this->start(); } else if ($op == 'stop') { echo "swoole socket service stop ...\n"; $this->stop(); } } /** * 运行 */ public function start() { try { //创建websocket服务器对象,监听0.0.0.0:7104端口 $this->ws = new \swoole_websocket_server("0.0.0.0", env('SOCKET_PORT', '8650')); //监听WebSocket连接打开事件 $this->ws->on('open', [$this, 'open']); //监听WebSocket消息事件 $this->ws->on('message', [$this, 'message']); //监听WebSocket主动推送消息事件 $this->ws->on('request', [$this, 'request']); //监听WebSocket连接关闭事件 $this->ws->on('close', [$this, 'close']); $this->ws->start(); } catch (\Exception $exception){ $date = date('Y-m-d H:i:s'); RedisService::set("caches:sockets:error", $exception->getMessage(), 600); $this->info("【{$date}】Socket:运行异常=》".$exception->getMessage()); } } /** * 建立连接 * @param $ws * @param $request */ public function open($ws, $request) { $date = date('Y-m-d H:i:s'); $fds = RedisService::get('caches:sockets:fds'); if($fds){ $fds[] = $request->fd; }else{ $fds = [$request->fd]; } RedisService::set('caches:sockets:fds',array_unique($fds), 86400); $this->ws->push($request->fd, json_encode(['op'=>'conn','message' => '连接成功','fd'=>$request->fd], 256)); $this->info("【{$date}】Socket:客户端【{$request->fd}】连接成功"); } /** * 接收消息 * @param $ws * @param $frame */ public function message($ws, $frame) { try { $date = date('Y-m-d H:i:s'); if ($frame->data == 'ping') { $this->ws->push($frame->fd, 'pong'); $this->info("【{$date}】Socket:客户端【{$frame->fd}】心跳包"); return false; } // 消息处理 $data = $frame->data ? json_decode($frame->data, true) : []; $fromId = $frame->fd; $op = isset($data['op']) ? $data['op'] : ''; $ct = isset($data['ct'])? $data['ct'] : ''; $uuid = isset($data['uuid'])? $data['uuid'] : uniqid(); $fromData = isset($data['data'])? $data['data'] : ''; switch($op){ case 'bind': RedisService::set("caches:{$uuid}:{$op}", ['uuid'=>$uuid,'qrcode'=>$fromData,'formId'=>$fromId,'date'=>$date], 7 * 24 * 3600); break; case 'login-bind': RedisService::set("caches:binds:login", ['uuid'=>$uuid,'data'=>$fromData,'formId'=>$fromId,'date'=>$date], 7 * 24 * 3600); RedisService::set("caches:{$uuid}:{$op}", ['uuid'=>$uuid,'data'=>$fromData,'formId'=>$fromId,'date'=>$date], 7 * 24 * 3600); RedisService::set("caches:fds", $this->ws->connections, 600); $fds = RedisService::get('caches:sockets:fds'); if($fds){ $localData = []; foreach ($fromData as $k => $v){ $localData[] = [ 'key'=> $k, 'value'=> json_decode($v, true) ]; } foreach ($fds as $fd) { // 需要先判断是否是正确的websocket连接,否则有可能会push失败 //广播推送 if ($this->ws->isEstablished($fd)) { $this->ws->push($fd, json_encode(['op'=>'login-load','uuid'=>$uuid,'fd'=> $fromId,'data'=> $localData,'t'=>time()], 256)); $this->info("【{$date}】Socket:客户端【{$fd}】推送广播消息"); } } } break; default: RedisService::set("caches:{$uuid}:{$op}", ['uuid'=>$uuid,'data'=>$fromData,'formId'=>$fromId,'date'=>$date], 7 * 24 * 3600); break; } $this->ws->push($fromId,json_encode(['op'=> $op,'message'=>'ok','t'=>time()])); $this->info("【{$date}】Socket:客户端【{$fromId}】消息处理成功"); } catch (\Exception $exception) { $this->ws->push($frame->fd, json_encode(['message' => $exception->getMessage()], 256)); } } /** * 接收请求 * @param $request * @param $response */ public function request($request, $response) { } /** * 发送消息 * @param $fd * @param $data * @return false */ public function sendMsg($fd, $data) { try { $this->ws->push($fd, $data); } catch (\Exception $exception){ return false; } } /** * 关闭连接 * @param $ws * @param $fd */ public function close($ws, $fd = '') { $date = date('Y-m-d H:i:s'); RedisService::keyDel("caches:server:".$fd); $this->info("【{$date}】Socket:客户端【{$fd}】连接关闭"); $this->ws->close($fd); } /** * 停止运行 */ public function stop() { if($this->ws){ $date = date('Y-m-d H:i:s'); $this->info("【{$date}】Socket:停止运行服务"); $this->ws->close(); } } }