argument('op'); $op = $op ? $op : 'start'; if ($op == 'start') { echo "swoole socket service start ...\n"; $this->start(); } else if ($op == 'stop') { echo "swoole socket service stop ...\n"; $this->stop(); } } /** * 运行 */ public function start() { try { //创建websocket服务器对象,监听0.0.0.0:7104端口 $this->ws = new \Swoole\WebSocket\Server("0.0.0.0", env('SOCKET_PORT', '6620')); //监听WebSocket连接打开事件 $this->ws->on('open', [$this, 'open']); //监听WebSocket消息事件 $this->ws->on('message', [$this, 'message']); //监听WebSocket主动推送消息事件 $this->ws->on('request', [$this, 'request']); //监听WebSocket连接关闭事件 $this->ws->on('close', [$this, 'close']); $this->ws->start(); } catch (\Exception $exception) { $date = date('Y-m-d H:i:s'); RedisService::set("caches:sockets:error", $exception->getMessage(), 600); $this->info("【{$date}】Socket:运行异常=》" . $exception->getMessage()); } } /** * 建立连接 * @param $ws * @param $request */ public function open($ws, $request) { $date = date('Y-m-d H:i:s'); $this->ws->push($request->fd, json_encode(['success' => 'true', 'op' => 'conn', 'message' => '连接成功', 'fd' => $request->fd], 256)); $this->info("【{$date}】Socket:客户端【{$request->fd}】连接成功"); } /** * 接收消息 * @param $ws * @param $frame */ public function message($ws, $frame) { $date = date('Y-m-d H:i:s'); RedisService::set("chats:frames:" . $frame->fd, json_decode($frame->data, true), 86400); if ($frame->data == 'ping') { $this->ws->push($frame->fd, 'pong'); $this->info("【{$date}】Socket:客户端【{$frame->fd}】心跳包",false); return false; } // 消息处理 $frameId = $frame->fd; $data = $frame->data ? json_decode($frame->data, true) : []; $fromUid = isset($data['from_user_id']) ? intval($data['from_user_id']) : 0; $op = isset($data['op']) ? $data['op'] : ''; //$this->sendMsg($frame->fd, $data); if (!in_array($op,['video_connect','video']) && !$this->checkSign($data)) { $this->info("【{$date}】Socket:签名失败【{$frameId}-{$fromUid}】"); return false; } $uuid = isset($data['uuid']) ? $data['uuid'] : uniqid(); $toUid = isset($data['to_user_id']) ? intval($data['to_user_id']) : 0; $apiUrl = isset($data['api_url']) ? trim($data['api_url']) : ''; $chatKey = isset($data['chat_key']) ? trim($data['chat_key']) : ''; $chatKey = $chatKey ? $chatKey : getChatKey($fromUid, $toUid); try { // 推送Fd处理 if ($fromUid && $frameId) { $fds = RedisService::get("chats:bindFds:{$chatKey}"); $fds = $fds? $fds : []; $fds[$op.'_'.$fromUid] = $frameId; RedisService::set("chats:bindFds:{$chatKey}", $fds, 86400); RedisService::set("chats:bind:{$fromUid}", ['fd' => $frameId, 'user_id' => $fromUid, 'uuid' => $uuid, 'chat_key' => $chatKey], 86400); } switch ($op) { case 'chat': // 文字聊天 $msgType = isset($data['msg_type']) ? $data['msg_type'] : 1; $message = isset($data['message']) ? trim($data['message']) : ''; // 验证是否还有次数和付费提示 (客服不限制) if (!$check = ImChatService::make()->checkChat($fromUid, $toUid, 1)) { $config = ConfigService::make()->getConfigByCode('chat_buy_num_money'); $config = $config ? explode('/', $config) : []; $money = isset($config[0]) && $config[0] > 0 ? $config[0] : 10; $num = isset($config[1]) && $config[1] > 0 ? $config[1] : 10; $tips = "抱歉您的免费次数已经用完,请先余额支付【{$money}元】购买【{$num}次】聊天次数再使用聊天服务。"; $this->info("【{$date}】Socket:次数不足【{$frameId}-{$fromUid}】。"); $this->sendMsg($frameId, ['success' => true, 'op' => 'buy', 'data' => ['type' => 1, 'tips' => $tips], 'message' => '次数不足,需要付费']); return false; } // 发送参数验证 $message = MessageService::make()->filterMessage($message, 1); if ($toUid <= 0 || $fromUid <= 0 || empty($message)) { $this->info("【{$date}】Socket:参数错误,from@{$fromUid}-to@{$toUid}。"); $this->sendMsg($frameId, ['success' => false, 'message' => '参数错误']); return false; } $data = [ 'from_user_id' => $fromUid, 'to_user_id' => $toUid, 'msg_type' => $msgType, 'description' => $msgType == 1 ? mb_substr($message, 0, 20) : '', 'message' => $msgType != 1 && $message ? get_image_path($message) : $message, 'chat_key' => $chatKey, 'create_time' => time(), 'update_time' => time(), 'is_read' => 2, 'from_is_show' => 1, 'to_is_show' => 1, 'status' => 1 ]; if (!$id = ImChatModel::insertGetId($data)) { $data = ['success' => false, 'message' => '消息发送失败']; $this->sendMsg($frameId, $data); return false; } // 推送消息给对方 $toInfo = MemberService::make()->getChatInfo($toUid, '', $apiUrl); $fromInfo = MemberService::make()->getChatInfo($fromUid, '', $apiUrl); $data['from_nickname'] = isset($fromInfo['nickname']) ? $fromInfo['nickname'] : ''; $data['from_avatar'] = isset($fromInfo['avatar']) ? $fromInfo['avatar'] : ''; $data['to_nickname'] = isset($toInfo['nickname']) ? $toInfo['nickname'] : ''; $data['to_avatar'] = isset($toInfo['avatar']) ? $toInfo['avatar'] : ''; $data['time_text'] = dateFormat($data['create_time']); $this->sendMsg($frameId, ['success' => true, 'op' => $op, 'data' => $data, 'message' => '发送成功:' . $frameId]); $toBindData = RedisService::get("chats:bind:{$toUid}"); $toFd = isset($toBindData['fd']) ? $toBindData['fd'] : 0; if ($toBindData && $toFd) { $this->sendMsg($toFd, ['success' => true, 'op' => $op, 'data' => $data, 'message' => '推送消息成功:' . $toFd]); $this->info("【{$date}】Socket:客户端【{$frameId}-{$fromUid}】推送消息给【{$toFd}-{$toUid}。"); } // 更新消费的次数 ImChatService::make()->updateChatParams($fromUid, $toUid, 1, 1); break; case 'video_connect': // 视频通话连接 // 推送消息给对方 $toInfo = MemberService::make()->getChatInfo($toUid, '', $apiUrl); $fromInfo = MemberService::make()->getChatInfo($fromUid, '', $apiUrl); $data['from_nickname'] = isset($fromInfo['nickname']) ? $fromInfo['nickname'] : ''; $data['from_avatar'] = isset($fromInfo['avatar']) ? $fromInfo['avatar'] : ''; $data['to_nickname'] = isset($toInfo['nickname']) ? $toInfo['nickname'] : ''; $data['to_avatar'] = isset($toInfo['avatar']) ? $toInfo['avatar'] : ''; $data['time_text'] = dateFormat(time()); // 推送消息 $fds = RedisService::get("chats:bindFds:{$chatKey}"); $toFds = RedisService::get("chats:bindFds:0_{$toUid}"); $toFds = $toFds? array_values($toFds) : []; $fds = $fds? array_values($fds) : []; $fds = $toFds? array_merge($toFds, $fds) : $fds; $this->sendMsg($frameId, ['success' => true, 'op' => $op, 'data' => $data, 'message' => '发送成功:' . $frameId]); $this->info("【{$date}】Socket:客户端【{$frameId}-{$fromUid}】@{$data['message']},消息群推给,".json_encode($fds)); if ($fds) { $fds = array_unique($fds); foreach($fds as $toFd){ if($toFd != $frameId){ $this->sendMsg($toFd, ['success' => true, 'op' => $op, 'data' => $data, 'message' => '推送视频通话消息成功:' . $toFd.'-'.$frameId]); $this->info("【{$date}】Socket:客户端【{$toFd}】@{$data['message']},推送视频消息,来源【{$frameId}】"); } } $this->info("【{$date}】Socket:客户端【{$frameId}-{$fromUid}】@{$data['message']},群推结束"); } break; case 'video': // 视频/语音聊天结束 // 发送参数验证 $type = isset($data['type']) ? intval($data['type']) : 1; $status = isset($data['status']) ? intval($data['status']) : 2; $videoTime = isset($data['video_time']) ? intval($data['video_time']) : 0; $message = isset($data['message']) && $data['message'] ? trim($data['message']) : '通话结束'; if ($toUid <= 0 || $fromUid <= 0 || empty($message) || $status <= 0) { $this->sendMsg($frameId, ['success' => false, 'message' => '参数错误']); return false; } $data = [ 'from_user_id' => $fromUid, 'to_user_id' => $toUid, 'msg_type' => 5, 'description' => $message, 'message' => $message, 'chat_key' => $chatKey, 'create_time' => time(), 'update_time' => time(), 'is_read' => 2, 'from_is_show' => 1, 'to_is_show' => 1, 'is_connect' => $status, // 1-接通,2-拒绝,3-取消 'video_time' => $videoTime, // 通话时长 'status' => 1 ]; if($type == 2){ $data['from_user_id'] = $toUid; $data['to_user_id'] = $fromUid; } if (!$id = ImChatModel::insertGetId($data)) { $data = ['success' => false, 'message' => '视频消息发送失败']; $this->sendMsg($frameId, $data); return false; } // 推送消息给对方 $toInfo = MemberService::make()->getChatInfo($data['to_user_id'], '', $apiUrl); $fromInfo = MemberService::make()->getChatInfo($data['from_user_id'], '', $apiUrl); $data['from_nickname'] = isset($fromInfo['nickname']) ? $fromInfo['nickname'] : ''; $data['from_avatar'] = isset($fromInfo['avatar']) ? $fromInfo['avatar'] : ''; $data['to_nickname'] = isset($toInfo['nickname']) ? $toInfo['nickname'] : ''; $data['to_avatar'] = isset($toInfo['avatar']) ? $toInfo['avatar'] : ''; $data['time_text'] = dateFormat($data['create_time']); $str = []; if($videoTime>3600){ $hour = intval($videoTime/3600); $str[] = $hour<10?'0'.$hour:$hour; } if($videoTime%3600 > 0){ $minute = intval($videoTime%3600/60); $str[] = $minute<10?'0'.$minute:$minute; } if($videoTime%60 > 0){ $second = intval($videoTime%60); $str[] = $second<10?'0'.$second:$second; } $data['video_time_text'] = $str? implode(':', $str) : ''; // 推送消息 $fds = RedisService::get("chats:bindFds:{$chatKey}"); $fromFds = RedisService::get("chats:bindFds:0_{$fromUid}"); $toFds = RedisService::get("chats:bindFds:0_{$toUid}"); $fromFds = $fromFds? array_values($fromFds) : []; $toFds = $toFds? array_values($toFds) : []; $fds = $fds? array_values($fds) : []; $fds = $fromFds? array_merge($fromFds, $fds) : $fds; $fds = $toFds? array_merge($toFds, $fds) : $fds; $this->sendMsg($frameId, ['success' => true, 'op' => $op, 'data' => $data, 'message' => '发送成功:' . $frameId]); $this->info("【{$date}】Socket:客户端【{$frameId}-{$fromUid}】@{$data['message']},消息群推给,".json_encode($fds)); if ($fds) { $fds = array_unique($fds); foreach($fds as $toFd){ if($toFd != $frameId || $type == 2){ $this->sendMsg($toFd, ['success' => true, 'op' => $op, 'data' => $data, 'message' => '推送视频通话消息成功:' . $toFd.'-来源'.$frameId]); $this->info("【{$date}】Socket:客户端【{$toFd}】@{$data['message']},推送视频消息,来源【{$frameId}】"); } } $this->info("【{$date}】Socket:客户端【{$frameId}-{$fromUid}】@{$data['message']},消息群推结束"); } // 如果通话时长 if ($status == 1 && $videoTime > 0) { // 更新剩余时长 if($type == 2){ ImChatService::make()->updateChatParams($toUid, $fromUid, $videoTime, 2); }else{ ImChatService::make()->updateChatParams($fromUid, $toUid, $videoTime, 2); } } break; case 'video-login': // 登录 case 'login': // 登录 $this->info("【{$date}】Socket:登录成功【{$frameId}-{$fromUid}-{$op}】。"); $this->sendMsg($frameId, ['success' => true,'op'=> $op, 'message' => '登录成功', 'data' => $data, 't' => time()]); break; default: $this->sendMsg($frameId, ['success' => false, 'message' => 'ok', 'data' => $data, 't' => time()]); break; } $this->info("【{$date}】Socket:客户端【{$frameId}】消息处理成功"); } catch (\Exception $exception) { RedisService::set("caches:sockets:error_{$frameId}", ['error' => $exception->getTrace(), 'date' => $date], 7200); $this->info("【{$date}】Socket:客户端【{$frameId}】消息处理错误 " . $exception->getMessage()); } } /** * 签名验证 * @param $data * @return bool */ public function checkSign($data) { $checkSign = isset($data['sign']) ? $data['sign'] : ''; $sign = getSign($data); if ($sign != $checkSign) { return false; } return true; } /** * 推送消息 * @param $fd * @param $op * @param $data */ public function sendMsg($fd, $data) { $date = date('Y-m-d H:i:s'); try { if (!RedisService::exists("chats:frames:" . $fd)) { $this->info("【{$date}】Socket:客户端【{$fd}】推送用户已经掉线 "); return false; } $this->ws->push($fd, json_encode($data, 256)); } catch (\Exception $exception) { $this->info("【{$date}】Socket:客户端【{$fd}】消息处理错误 " . $exception->getMessage()); } } /** * 接收请求 * @param $request * @param $response */ public function request($request, $response) { } /** * 关闭连接 * @param $ws * @param $fd */ public function close($ws, $fd = '') { $date = date('Y-m-d H:i:s'); RedisService::clear("chats:frames:" . $fd); $this->info("【{$date}】Socket:客户端【{$fd}】连接关闭"); $this->ws->close($fd); } /** * 停止运行 */ public function stop() { if ($this->ws) { $date = date('Y-m-d H:i:s'); $this->info("【{$date}】Socket:停止运行服务"); $this->ws->close(); } } /** * 消息 * @param string $data */ public function info($data,$verbosity=true) { \logger()->channel('swoole')->info($data); if(env('SWOOLE_LOG', true) && $verbosity){ parent::info($data); } } }