argument('op'); $op = $op ? $op : 'start'; if ($op == 'start') { echo "socket start ...\n"; $this->start(); } else if ($op == 'stop') { echo "socket stop ...\n"; $this->stop(); } } /** * 运行 */ public function start() { try { //创建websocket服务器对象,监听0.0.0.0:7104端口 $this->ws = new \Swoole\WebSocket\Server("0.0.0.0", env('SOCKET_PORT', '6520')); //监听WebSocket连接打开事件 $this->ws->on('open', [$this, 'open']); //监听WebSocket消息事件 $this->ws->on('message', [$this, 'message']); //监听WebSocket主动推送消息事件 $this->ws->on('request', [$this, 'request']); //监听WebSocket连接关闭事件 $this->ws->on('close', [$this, 'close']); $this->ws->start(); } catch (\Exception $exception){ RedisService::set("caches:sockets:error", $exception->getMessage(), 600); } } /** * 建立连接 * @param $ws * @param $request */ public function open($ws, $request) { echo "连接成功:" . $request->fd . "\n"; $fds = RedisService::get('chats:fds'); if($fds){ $fds[] = $request->fd; }else{ $fds = [$request->fd]; } RedisService::set('chats:fds',$fds, 86400); $this->ws->push($request->fd, json_encode(['message' => '连接成功:'.$request->fd], 256)); } /** * 接收消息 * @param $ws * @param $frame */ public function message($ws, $frame) { try { if ($frame->data == 'ping') { $this->ws->push($frame->fd, 'pong'); return false; } // 消息处理 $data = $frame->data ? json_decode($frame->data, true) : []; $op = isset($data['op']) ? $data['op'] : ''; $userId = isset($data['from_uid']) ? trim($data['from_uid']) : 0; $chatKey = isset($data['chat_key']) ? trim($data['chat_key']) : ''; $toUid = isset($data['to_uid']) ? intval($data['to_uid']) : 0; if ($userId && $frame->fd) { $chatKey = $chatKey ? $chatKey : getChatKey($userId, $toUid); RedisService::set("chats:bind:{$userId}", ['fd' => $frame->fd, 'user_id' => $userId, 'chat_key' => $chatKey], 86400); } if ($data) { switch ($op) { case 'chat': // 默认聊天 $toUid = isset($data['to_uid']) ? intval($data['to_uid']) : 0; $fromUid = isset($data['from_uid']) ? intval($data['from_uid']) : 0; $messageType = isset($data['message_type']) ? $data['message_type'] : 1; if ($toUid <= 0 || $fromUid <= 0) { return false; } if (!ChatMessageService::make()->saveData($data)) { $data = ['message' => lang(ChatMessageService::make()->getError())]; $this->ws->push($frame->fd, json_encode($data, 256)); return false; } $toInfo = MemberService::make()->getInfo($toUid); $fromInfo = MemberService::make()->getInfo($fromUid); $message = isset($data['message']) ? $data['message'] : ''; $chatKey = getChatKey($fromUid, $toUid); $data = [ 'type' => isset($data['type']) ? $data['type'] : 1, 'message_type' => $messageType, 'from_uid' => $fromUid, 'from_username' => isset($fromInfo['username']) ? $fromInfo['username'] : '客服', 'from_username_text' => isset($fromInfo['username']) ? format_account($fromInfo['username']) : '客服', 'to_uid' => isset($data['to_uid']) ? $data['to_uid'] : 0, 'to_username' => isset($toInfo['username']) ? $toInfo['username'] : '客服', 'to_username_text' => isset($toInfo['username']) ? format_account($toInfo['username']) : '客服', 'chat_key' => $chatKey, 'message' => $message, 'message_url' => $messageType == 2 && $message ? get_image_url($message) : '', 'create_time' => time(), 'time_text' => format_time(time() - 1), 'is_read' => 1, 'status' => 1, ]; $this->ws->push($frame->fd, json_encode($data, 256)); $toBindData = RedisService::get("chats:bind:{$toUid}"); $toFd = isset($toBindData['fd']) ? $toBindData['fd'] : 0; $toChatKey = isset($toBindData['chat_key']) ? $toBindData['chat_key'] : ''; if ($toBindData && $toFd && $toChatKey == $chatKey) { $this->ws->push($toFd, json_encode($data, 256)); } break; case 'order': $toBindData = RedisService::get("chats:bind:od_{$toUid}"); $toFd = isset($toBindData['fd']) ? $toBindData['fd'] : 0; if($toFd){ $this->sendMsg($toFd, $frame->data); } $data['fd'] = $frame->fd; $data['to_fd'] = $toFd; $this->ws->push($frame->fd, json_encode($data, 256)); break; default: $this->ws->push($frame->fd, json_encode(['message' => $frame->data], 256)); break; } } else { $this->ws->push($frame->fd, json_encode(['message' => 'no data'], 256)); } } catch (\Exception $exception) { $this->ws->push($frame->fd, json_encode(['message' => $exception->getMessage()], 256)); } } /** * 接收请求 * @param $request * @param $response */ public function request($request, $response) { } /** * 发送消息 * @param $fd * @param $data * @return false */ public function sendMsg($fd, $data) { try { $this->ws->push($fd, $data); } catch (\Exception $exception){ return false; } } /** * 关闭连接 * @param $ws * @param $fd */ public function close($ws, $fd = '') { // var_dump($ws); echo '连接关闭:' . $fd . "\n"; $this->ws->close($fd); } /** * 停止运行 */ public function stop() { if($this->ws){ $this->ws->close(); } } }