argument('op'); $op = $op ? $op : 'start'; if ($op == 'start') { echo "swoole socket service start ...\n"; $this->start(); } else if ($op == 'stop') { echo "swoole socket service stop ...\n"; $this->stop(); } } /** * 运行 */ public function start() { try { //创建websocket服务器对象,监听0.0.0.0:7104端口 $this->ws = new \Swoole\WebSocket\Server("0.0.0.0", env('SOCKET_PORT', '6530')); //监听WebSocket连接打开事件 $this->ws->on('open', [$this, 'open']); //监听WebSocket消息事件 $this->ws->on('message', [$this, 'message']); //监听WebSocket主动推送消息事件 $this->ws->on('request', [$this, 'request']); //监听WebSocket连接关闭事件 $this->ws->on('close', [$this, 'close']); $this->ws->start(); } catch (\Exception $exception) { $date = date('Y-m-d H:i:s'); RedisService::set("caches:sockets:error", $exception->getMessage(), 600); $this->info("【{$date}】Socket:运行异常=》" . $exception->getMessage()); } } /** * 建立连接 * @param $ws * @param $request */ public function open($ws, $request) { $date = date('Y-m-d H:i:s'); $this->ws->push($request->fd, json_encode(['success' => 'true', 'op' => 'conn', 'message' => '连接成功', 'fd' => $request->fd], 256)); $this->info("【{$date}】Socket:客户端【{$request->fd}】连接成功"); } /** * 接收消息 * @param $ws * @param $frame */ public function message($ws, $frame) { $date = date('Y-m-d H:i:s'); RedisService::set("chats:frames:" . $frame->fd, json_decode($frame->data, true), 86400); if ($frame->data == 'ping') { $this->ws->push($frame->fd, 'pong'); $this->info("【{$date}】Socket:客户端【{$frame->fd}】心跳包",false); return false; } // 消息处理 $frameId = $frame->fd; $data = $frame->data ? json_decode($frame->data, true) : []; $fromUid = isset($data['from_uid']) ? intval($data['from_uid']) : 0; $token = isset($data['token']) ? $data['token'] : ''; $op = isset($data['op']) ? $data['op'] : ''; $scene = isset($data['scene']) && $data['scene'] ? $data['scene'] : 'chat'; $jwt = new Jwt(); $userId = $jwt->verifyToken($token); if ($userId != $fromUid) { $this->info("【{$scene} {$date}】Socket:签名失败【{$frameId}-{$fromUid}】"); return false; } $uuid = isset($data['uuid']) ? $data['uuid'] : uniqid(); $toUid = isset($data['to_uid']) ? intval($data['to_uid']) : 0; $apiUrl = env('APP_URL',''); $chatKey = isset($data['chat_key']) ? trim($data['chat_key']) : ''; $chatKey = $chatKey ? $chatKey : getChatKey($fromUid, $toUid); try { // 推送Fd处理 if ($fromUid && $frameId) { $fds = RedisService::get("chats:bindFds:{$chatKey}"); $fds = $fds? $fds : []; $fds[$op.'_'.$fromUid] = $frameId; RedisService::set("chats:bindFds:{$chatKey}", $fds, 86400); RedisService::set("chats:bind:{$scene}_{$fromUid}", ['fd' => $frameId,'scene'=>$scene, 'user_id' => $fromUid, 'uuid' => $uuid, 'chat_key' => $chatKey], 86400); } switch ($op) { case 'chat': // 图文聊天 $msgType = isset($data['msg_type']) ? $data['msg_type'] : 1; $chatType = isset($data['chat_type']) ? $data['chat_type'] : 1; $message = isset($data['message']) ? trim($data['message']) : ''; // 发送参数验证 if ($toUid <= 0 || $fromUid <= 0 || empty($message)) { $this->info("【{$scene} {$date}】Socket:参数错误,from@{$fromUid}-to@{$toUid}。"); $this->sendMsg($frameId, ['success' => false,'op'=>'push','scene'=>$scene,'data'=>$data, 'message' => '参数错误']); return false; } // 用户私聊 $fromUserName = $fromAvatar = ''; $toUserName = $toAvatar = ''; $fromInfo = MemberService::make()->getCacheInfo(['id'=> $fromUid,'status'=>1]); if(empty($fromInfo)){ $this->info("【{$scene} {$date}】Socket:发送用户不存在,from@{$fromUid}-to@{$toUid}。"); $this->sendMsg($frameId, ['success' => false,'op'=>'push','scene'=>$scene,'data'=>$data, 'message' => '您的账号不可用或已冻结,请联系客服']); return false; } $toInfo = MemberService::make()->getCacheInfo(['id'=> $toUid,'status'=>1]); if(empty($toInfo)){ $this->info("【{$scene} {$date}】Socket:接收用户不存在,from@{$fromUid}-to@{$toUid}。"); $this->sendMsg($frameId, ['success' => false,'op'=>'push','scene'=>$scene,'data'=>$data, 'message' => '对方账号不可用或无法接收消息']); return false; } if($chatType == 1){ $fromUserName = isset($fromInfo['nickname'])? $fromInfo['nickname'] : $fromUid; $fromAvatar = isset($fromInfo['avatar'])? $fromInfo['avatar'] : ''; $toUserName = isset($toInfo['nickname'])? $toInfo['nickname'] : $toUid; $toAvatar = isset($toInfo['avatar'])? $toInfo['avatar'] : ''; } $msgData = [ 'from_uid' => $fromUid, 'to_uid' => $toUid, 'type' => 9, 'msg_type' => $msgType, 'chat_type' => $chatType, 'from_user_name' => $fromUserName, 'to_user_name' => $toUserName, 'from_user_avatar' => $fromAvatar, 'to_user_avatar' => $toAvatar, 'description' => $msgType == 1 ? mb_substr($message, 0, 20) : '', 'content' => $message, 'goods_id' => isset($data['goods_id'])? intval($data['goods_id']) : 0, 'live_id' => isset($data['live_id'])? intval($data['live_id']) : 0, 'chat_key' => $chatKey, 'create_time' => time(), 'update_time' => time(), 'is_read' => 2, 'status' => 1 ]; if (!$id = MessageModel::insertGetId($msgData)) { $data = ['success' => false,'op'=>'push','scene'=>$scene,'data'=>$msgData, 'message' => '消息发送失败']; $this->sendMsg($frameId, $data); return false; } // 推送消息给对方 $msgData['from_user_avatar'] = get_image_url($msgData['from_user_avatar'], $apiUrl); $msgData['to_user_avatar'] = get_image_url($msgData['to_user_avatar'], $apiUrl); $msgData['content'] = $msgType == 2? get_images_preview(json_decode($msgData['content'],true),'', $apiUrl) : $msgData['content']; $msgData['time_text'] = dateFormat($msgData['create_time']); $msgData['goods'] = []; $msgData['live_info'] = []; // 直播间信息 if($msgData['live_id']){ $info = LiveModel::with(['member'])->where(['id'=> $msgData['live_id'],'mark'=>1]) ->select(['id','user_id','play_url','description','status']) ->first(); $info = $info? $info->toArray() : []; if($info){ $member = isset($info['member'])? $info['member'] : []; $member['avatar'] = isset($member['avatar']) && $member['avatar']? get_image_url($member['avatar'], $apiUrl) : get_image_url('/images/member/logo.png',$apiUrl); $info['member'] = $member; } $msgData['live_info'] = $info; } $this->sendMsg($frameId, ['success' => true, 'op' => 'push', 'scene'=> $scene, 'data' => $msgData, 'message' => '发送成功:' . $frameId]); $toBindData = RedisService::get("chats:bind:{$scene}_{$toUid}"); $toFd = isset($toBindData['fd']) ? $toBindData['fd'] : 0; if ($toBindData && $toFd) { $this->sendMsg($toFd, ['success' => true, 'op' => 'push' ,'scene'=> $scene, 'data' => $msgData, 'message' => '推送消息成功:' . $toFd]); $this->info("【{$date}】Socket:客户端【{$frameId}-{$fromUid}】推送消息给【{$toFd}-{$toUid}。"); } break; case 'live': // 直播聊天 // 推送消息给对方 $msgType = isset($data['msg_type']) ? $data['msg_type'] : 1; $liveId = isset($data['live_id']) ? $data['live_id'] : 0; $message = isset($data['message']) ? trim($data['message']) : ''; // 发送参数验证 if ($toUid <= 0 || $fromUid <= 0 || $liveId<=0 || empty($message)) { $this->info("【{$scene} {$date}】Socket:参数错误,from@{$fromUid}-to@{$toUid}。"); $this->sendMsg($frameId, ['success' => false,'op'=>'push','scene'=>$scene,'data'=>$data, 'message' => '参数错误']); return false; } $msgData = [ 'from_uid' => $fromUid, 'to_uid' => $toUid, 'msg_type' => 1, 'live_id' => $liveId, 'message' => $message, 'chat_key' => $chatKey, 'create_time' => time(), 'update_time' => time(), 'status' => 1 ]; if (!$id = LiveChatModel::insertGetId($msgData)) { $data = ['success' => false,'op'=>'push','scene'=>$scene,'data'=>$msgData, 'message' => '消息发送失败']; $this->sendMsg($frameId, $data); return false; } // 推送消息给对方 $msgData['time_text'] = dateFormat($msgData['create_time']); $this->sendMsg($frameId, ['success' => true, 'op' => 'push_live', 'scene'=> $scene, 'data' => $msgData, 'message' => '发送成功:' . $frameId]); $toBindData = RedisService::get("chats:bind:{$scene}_{$toUid}"); $toFd = isset($toBindData['fd']) ? $toBindData['fd'] : 0; if ($toBindData && $toFd) { $this->sendMsg($toFd, ['success' => true, 'op' => 'push_live' ,'scene'=> $scene, 'data' => $msgData, 'message' => '推送消息成功:' . $toFd]); $this->info("【{$scene} {$date}】Socket:客户端【{$frameId}-{$fromUid}】推送消息给【{$toFd}-{$toUid}。"); } break; case 'login': // 登录 $this->info("【{$scene} {$date}】Socket:登录成功【{$frameId}-{$fromUid}-{$op}】。"); $this->sendMsg($frameId, ['success' => true,'op'=> $op, 'scene'=>$scene, 'message' => '登录成功', 'data' => $data, 't' => time()]); break; default: $this->sendMsg($frameId, ['success' => false, 'message' => 'ok', 'scene'=>$scene, 'data' => $data, 't' => time()]); break; } $this->info("【{$scene} {$date}】Socket:客户端【{$frameId}】消息处理成功"); } catch (\Exception $exception) { RedisService::set("caches:sockets:error_{$frameId}", ['error' => $exception->getMessage(),'trace'=>$exception->getTrace(), 'date' => $date], 7200); $this->info("【{$scene} {$date}】Socket:客户端【{$frameId}】消息处理错误 " . $exception->getMessage()); } } /** * 签名验证 * @param $data * @return bool */ public function checkSign($data) { $checkSign = isset($data['sign']) ? $data['sign'] : ''; $sign = getSign($data); if ($sign != $checkSign) { return false; } return true; } /** * 推送消息 * @param $fd * @param $op * @param $data */ public function sendMsg($fd, $data) { $date = date('Y-m-d H:i:s'); try { if (!RedisService::exists("chats:frames:" . $fd)) { $this->info("【{$date}】Socket:客户端【{$fd}】推送用户已经掉线 "); return false; } $this->ws->push($fd, json_encode($data, 256)); } catch (\Exception $exception) { $this->info("【{$date}】Socket:客户端【{$fd}】消息处理错误 " . $exception->getMessage()); } } /** * 接收请求 * @param $request * @param $response */ public function request($request, $response) { } /** * 关闭连接 * @param $ws * @param $fd */ public function close($ws, $fd = '') { $date = date('Y-m-d H:i:s'); RedisService::clear("chats:frames:" . $fd); $this->info("【{$date}】Socket:客户端【{$fd}】连接关闭"); $this->ws->close($fd); } /** * 停止运行 */ public function stop() { if ($this->ws) { // 直接杀 $port = env('SOCKET_PORT', '6530'); if(function_exists('exec')){ exec('pid=$(lsof -F p -i:'.$port.' | cut -b 2-) && pkill -9 $pid'); } echo "socket stop...\n"; $date = date('Y-m-d H:i:s'); $this->info("【{$date}】Socket:停止运行服务"); $this->ws->close(); } } /** * 消息 * @param string $data */ public function info($data,$verbosity=true) { \logger()->channel('swoole')->info($data); if(env('SWOOLE_LOG', true) && $verbosity){ parent::info($data); } } }