argument('op');
$op = $op ? $op : 'start';
if ($op == 'start') {
echo "swoole chat service start ...\n";
$this->start();
} else if ($op == 'stop') {
echo "swoole chat service stop ...\n";
$this->stop();
}
}
/**
* 运行
*/
public function start()
{
try {
//创建websocket服务器对象,监听0.0.0.0:7104端口
// HTTPS 通过nginx 配置证书即可
$this->ws = new \Swoole\WebSocket\Server("0.0.0.0", env('SOCKET_PORT', '8660'));
//监听WebSocket连接打开事件
$this->ws->on('open', [$this, 'open']);
//监听WebSocket消息事件
$this->ws->on('message', [$this, 'message']);
//监听WebSocket主动推送消息事件
$this->ws->on('request', [$this, 'request']);
//监听WebSocket连接关闭事件
$this->ws->on('close', [$this, 'close']);
$this->ws->start();
} catch (\Exception $exception) {
$date = date('Y-m-d H:i:s');
RedisService::set("caches:sockets:error", $exception->getMessage(), 600);
$this->info("【{$date}】Socket:运行异常=》" . $exception->getMessage());
}
}
/**
* 建立连接
* @param $ws
* @param $request
*/
public function open($ws, $request)
{
$date = date('Y-m-d H:i:s');
$logFile = '/storage/logs/swoole-task-'.date('Y-m-d', time() - 86400).'.log';
if(file_exists(base_path().$logFile)){
unlink(base_path().$logFile);
}
$fdData = RedisService::get("chats:bind:chat_1");
$customFd = $fdData && isset($fdData['fd'])? $fdData['fd'] : 0;
$checkFd = RedisService::get("chats:frames:" . $customFd);
RedisService::clear('chats:checkCustom:from_'.$request->fd);
$this->ws->push($request->fd, json_encode(['success' => 'true', 'op' => 'conn', 'message' => '连接成功','custom_fd'=>$checkFd && $customFd? $customFd:0, 'fd' => $request->fd], 256));
$this->info("【{$date}】Socket:客户端【{$request->fd}】连接成功");
}
/**
* 接收消息
* @param $ws
* @param $frame
*/
public function message($ws, $frame)
{
$date = date('Y-m-d H:i:s');
RedisService::set("chats:frames:" . $frame->fd, json_decode($frame->data, true), 86400);
$fdData = RedisService::get("chats:bind:chat_1");
$customFd = $fdData && isset($fdData['fd'])? $fdData['fd'] : 0;
if ($frame->data == 'ping') {
$this->ws->push($frame->fd, 'pong');
if($customFd != $frame->fd){
$this->sendMsg($frame->fd, ['success' => true,'op'=>'custom', 'message' => $customFd?'客服上线':'客服离线', 'scene'=>'check', 'data' => ['online'=>$customFd,'to_fd'=>$customFd], 't' => time()]);
}
// 推送客服保证金信息
if($customFd == $frame->fd){
$messages = MessageService::make()->getUnreadList(0);
$count = isset($messages['count'])? $messages['count'] : 0;
if($count>0){
$this->sendMsg($frame->fd, ['success' => true,'op'=>'notice', 'message' => '消息通知', 'scene'=>'deposit', 'data' => $messages, 't' => time()]);
}
}
$this->info("【{$date}】Socket:客户端【{$frame->fd}】心跳包",false);
return false;
}
// 推送客服保证金信息
else if($customFd && RedisService::get("chats:frames:" . $customFd)){
$messages = MessageService::make()->getUnreadList(0);
$count = isset($messages['count'])? $messages['count'] : 0;
if($count>0){
$this->sendMsg($customFd, ['success' => true,'op'=>'notice', 'message' => '消息通知', 'scene'=>'deposit', 'data' => $messages, 't' => time()]);
}
}
// 消息处理
$frameId = $frame->fd;
$data = $frame->data ? json_decode($frame->data, true) : [];
$fromUid = isset($data['from_uid']) ? intval($data['from_uid']) : 0;
$isCustom = isset($data['is_custom']) ? intval($data['is_custom']) : 0;
$token = isset($data['token']) ? $data['token'] : '';
$op = isset($data['op']) ? $data['op'] : '';
$scene = isset($data['scene']) && $data['scene'] ? $data['scene'] : 'chat';
$toUid = isset($data['to_uid']) ? intval($data['to_uid']) : 0;
$jwt = new Jwt('jwt_jd_app');
$userId = $jwt->verifyToken($token);
if (!$isCustom && $userId != $fromUid) {
$this->info("【{$scene} {$date}】Socket:请先登录再连接【{$frameId}-{$fromUid}】");
$this->sendMsg($frameId, ['success' => false,'op'=>'error', 'message' => '请先登录', 'scene'=>$scene, 'data' => $data, 't' => time()]);
return false;
}
if ($op != 'login' && ($toUid<=0 || $fromUid == $toUid)) {
$this->info("【{$scene} {$date}】Socket:参数错误【{$fromUid}-{$toUid}】");
$this->sendMsg($frameId, ['success' => false,'op'=>'error', 'message' => '参数错误,请先选择回复用户~', 'scene'=>$scene, 'data' => $data, 't' => time()]);
return false;
}
// 签名验证
$system = isset($data['system']) ? $data['system'] : [];
$uuid = isset($system['uuid'])? $system['uuid'] : 0;
$ctime = isset($system['ct'])? $system['ct'] : 0;
$message = isset($data['message']) ? trim($data['message']) : '';
$sign = isset($data['sign']) ? trim($data['sign']) : '';
$checkSign = getSign($op.'&'.$message.'&'.$fromUid.'&'.$uuid.'&'.$ctime);
if($checkSign != $sign){
$this->info("【{$scene} {$date}】Socket:签名失败【{$frameId}-{$fromUid}】");
$this->sendMsg($frameId, ['success' => false,'op'=>'error', 'message' => '请求签名失败', 'scene'=>$scene,'sign'=>$checkSign, 'data' => $data, 't' => time()]);
return false;
}
$apiUrl = env('APP_URL','');
$chatKey = isset($data['chat_key']) ? trim($data['chat_key']) : '';
$chatKey = $chatKey ? $chatKey : getChatKey($fromUid, $toUid);
try {
// 推送Fd处理
if ($fromUid && $frameId) {
RedisService::set("chats:bind:{$scene}_{$fromUid}", ['fd' => $frameId,'scene'=>$scene, 'user_id' => $fromUid, 'uuid' => $uuid, 'chat_key' => $chatKey], 3600);
}
switch ($op) {
case 'chat': // 图文聊天
$msgType = isset($data['msg_type']) ? $data['msg_type'] : 1;
// 发送参数验证
if ($fromUid <= 0 || empty($message)) {
$this->info("【{$scene} {$date}】Chat:参数错误,from@{$fromUid}-to@{$toUid}。");
$this->sendMsg($frameId, ['success' => false,'op'=>'push','scene'=>$scene,'data'=>$data, 'message' => '参数错误']);
return false;
}
// 敏感词过滤
$message = \App\Services\Api\MessageService::make()->checkMessage($message);
// 用户私聊
$fromInfo = [];
if($fromUid>1){
$fromInfo = MemberService::make()->getInfo(['id'=> $fromUid,'status'=>1],[],false);
if(empty($fromInfo)){
$this->info("【{$scene} {$date}】Chat:发送用户不存在,from@{$fromUid}-to@{$toUid}。");
$this->sendMsg($frameId, ['success' => false,'op'=>'push','scene'=>$scene,'data'=>$data, 'message' => '您的账号不可用或已冻结,请联系客服']);
return false;
}
}
$toInfo = [];
if($toUid>1){
$toInfo = MemberService::make()->getInfo(['id'=> $toUid,'status'=>1],[],false);
if(empty($toInfo)){
$this->info("【{$scene} {$date}】Chat:接收用户不存在,from@{$fromUid}-to@{$toUid}。");
$this->sendMsg($frameId, ['success' => false,'op'=>'push','scene'=>$scene,'data'=>$data, 'message' => '对方账号不可用或无法接收消息']);
return false;
}
}
$fromUserName = isset($fromInfo['realname'])? $fromInfo['realname'] : ($fromUid>1? '用户'.$fromUid:'客服');
$fromAvatar = isset($fromInfo['avatar']) && $fromInfo['avatar']? $fromInfo['avatar'] : get_image_url($fromUid>1?'/images/member/logo.png':'/images/member/custom.png');
$toUserName = isset($toInfo['realname'])? $toInfo['realname'] : ($toUid>1? '用户'.$toUid:'客服');
$toAvatar = isset($toInfo['avatar']) && $toInfo['avatar']? $toInfo['avatar'] : get_image_url($toUid>1?'/images/member/logo.png':'/images/member/custom.png');
$msgData = [
'from_uid' => $fromUid,
'to_uid' => $toUid,
'type' => 1,
'msg_type' => $msgType,
'title' => '聊天消息',
'description' => $msgType == 2 ? '[图片]' : mb_substr($message, 0, 20),
'content' => $msgType==2? get_image_path($message) : $message,
'chat_key' => $chatKey,
'create_time' => time(),
'update_time' => time(),
'is_read' => 2,
'status' => 1
];
// 敏感消息
if(empty($message)){
$msgData['from_user_name'] = $fromUserName;
$msgData['from_user_avatar'] = get_image_url($fromAvatar, $apiUrl);
$msgData['to_user_name'] = $toUserName;
$msgData['to_user_avatar'] = get_image_url($toAvatar, $apiUrl);
$msgData['time_text'] = dateFormat($msgData['create_time']);
$msgData['content'] = '敏感内容';
$this->sendMsg($frameId, ['success' => true, 'op' => 'push', 'scene'=> $scene,'custom_fd'=>$customFd, 'data' => $msgData, 'message' => '发送成功:' . $frameId]);
return false;
}
if (!$id = MessageModel::insertGetId($msgData)) {
$data = ['success' => false,'op'=>'push','scene'=>$scene,'data'=>$msgData, 'message' => '消息发送失败'];
$this->sendMsg($frameId, $data);
return false;
}
// 推送消息给对方
$msgData['from_user_name'] = $fromUserName;
$msgData['from_user_avatar'] = get_image_url($fromAvatar, $apiUrl);
$msgData['to_user_name'] = $toUserName;
$msgData['to_user_avatar'] = get_image_url($toAvatar, $apiUrl);
$msgData['time_text'] = dateFormat($msgData['create_time']);
if($msgData['msg_type'] == 1 || $msgData['msg_type'] == 3){
$msgData['content'] = format_message($msgData['content']);
}else if($msgData['msg_type'] == 2){
$msgData['content'] = $msgData['content']? get_image_url($msgData['content'],$apiUrl) : '';
}else if($msgData['msg_type'] == 4){
$msgData['content'] = $msgData['content']? json_decode($msgData['content'], true) : [];
}
// 接收方缓存
RedisService::clear("caches:messages:unread_count_{$toUid}");
// 返回自身消息
$this->sendMsg($frameId, ['success' => true, 'op' => 'push', 'scene'=> $scene,'custom_fd'=>$customFd, 'data' => $msgData, 'message' => '发送成功:' . $frameId]);
// 推送给对方消息
$pushCustom = ConfigService::make()->getConfigByCode('push_custom_message',1);
$toBindData = RedisService::get("chats:bind:{$scene}_{$toUid}");
$toFd = isset($toBindData['fd']) ? $toBindData['fd'] : 0;
if ($toBindData && $toFd) {
$this->sendMsg($toFd, ['success' => true, 'op' => 'push' ,'scene'=> $scene,'custom_fd'=>$customFd, 'data' => $msgData, 'message' => '推送消息成功:' . $toFd]);
$this->info("【{$date}】Chat:客户端【{$frameId}-{$fromUid}】推送消息给【{$toFd}-{$toUid}。");
}
// 如果开启离线推送,或一直推送,根据关键词推送客服消息
if((($pushCustom == 1 && !$toFd)||($pushCustom==2) && ($msgType == 1) )&& $fromUid >1){
// 搜索内容
$replyContent = ConfigService::make()->getContentByKw($message);
$sendData = $msgData = [
'from_uid' => 1,
'to_uid' => $fromUid,
'type' => 1,
'msg_type' => 3,
'title' => '客服消息',
'description' => mb_substr($message, 0, 20),
'content' => $replyContent,
'chat_key' => $chatKey,
'create_time' => time(),
'update_time' => time(),
'is_read' => 1,
'status' => 1
];
$sendData['from_user_name'] = $toUserName;
$sendData['from_user_avatar'] = $toAvatar;
$sendData['to_user_name'] = $fromUserName;
$sendData['to_user_avatar'] = $fromAvatar;
$sendData['content'] = format_message($replyContent);
$sendData['time_text'] = dateFormat($msgData['create_time']);
if($replyContent){
if (MessageModel::insertGetId($msgData)) {
$this->sendMsg($frameId, ['success' => true, 'op' => 'push', 'scene'=> $scene, 'data' => $sendData, 'message' => '回复成功:' . $frameId]);
}
}else{
$replyContent = ConfigService::make()->getConfigByCode('custom_offline_reply','');
if($replyContent){
$sendData['content'] = format_message($replyContent);
$this->sendMsg($frameId, ['success' => true, 'op' => 'push', 'scene'=> $scene, 'data' => $sendData, 'message' => '回复成功:' . $frameId]);
}
}
}
break;
case 'refund_apply': // 保证金退款申请消息通知
$orderId = isset($data['order_id'])? $data['order_id'] : 0;
// 发送参数验证
if ($fromUid <= 0 || empty($message) || empty($orderId)) {
$this->info("【{$scene} {$date}】Message:参数错误,from@{$fromUid}-to@{$toUid}。");
$this->sendMsg($frameId, ['success' => false,'op'=>'notice','scene'=>$scene,'data'=>$data, 'message' => '参数错误']);
return false;
}
// 订单信息
$orderInfo = DepositService::make()->getInfo($orderId);
$orderNo = isset($orderInfo['refund_no'])? $orderInfo['refund_no'] : '';
$orderMoney = isset($orderInfo['refund_money'])? $orderInfo['refund_money'] : 0;
if(empty($orderInfo) || $orderMoney<=0){
$this->info("【{$scene} {$date}】Message:订单信息不存在,from@{$fromUid}-to@{$toUid}-{$orderId}。");
$this->sendMsg($frameId, ['success' => false,'op'=>'notice','scene'=>$scene,'data'=>$data, 'message' => '订单信息不存在']);
return false;
}
$msgData = [
'from_uid' => $fromUid,
'to_uid' => $toUid,
'type' => 4,
'msg_type' => 4,
'title' => '保证金退款申请消息',
'description' => $message,
'order_no' => $orderNo,
'content' => json_encode([
'title'=> $message.'查看订单',
'order_no'=> $orderNo,
'money'=> $orderMoney,
'date'=> date('Y-m-d H:i:s'),
'user_id'=> $fromUid,
'type'=> 'deposit',
'remark'=> '退保申请',
],256),
'chat_key' => $chatKey,
'create_time' => time(),
'update_time' => time(),
'is_read' => 2,
'status' => 1
];
if (!$id = MessageModel::insertGetId($msgData)) {
$data = ['success' => false,'op'=>'notice','scene'=>$scene,'data'=>$msgData, 'message' => '消息发送失败'];
$this->sendMsg($frameId, $data);
return false;
}
// 推送消息给对方
$msgData['time_text'] = dateFormat($msgData['create_time']);
$msgData['content'] = $msgData['content']? json_decode($msgData['content'], true) : [];
// 返回自身消息
$this->sendMsg($frameId, ['success' => true, 'op' => 'notice', 'scene'=> $scene,'custom_fd'=>$customFd, 'data' => $msgData, 'message' => '发送成功:' . $frameId]);
// 推送给对方消息
$toBindData = RedisService::get("chats:bind:{$scene}_{$toUid}");
$toFd = isset($toBindData['fd']) ? $toBindData['fd'] : 0;
if ($toBindData && $toFd) {
$this->sendMsg($toFd, ['success' => true, 'op' => 'notice' ,'scene'=> $scene,'custom_fd'=>$customFd, 'data' => ['count'=>1,'list'=>[$msgData]], 'message' => '推送消息成功:' . $toFd]);
$this->info("【{$date}】Message:客户端【{$frameId}-{$fromUid}】推送消息给【{$toFd}-{$toUid}。");
}
break;
case 'notice': // 其他消息通知
$type = isset($data['type']) ? $data['type'] : 5;
$order = isset($data['order']) ? $data['order'] : [];
// 发送参数验证
if ($fromUid <= 0 || empty($message) || empty($order)) {
$this->info("【{$scene} {$date}】Message:参数错误,from@{$fromUid}-to@{$toUid}。");
$this->sendMsg($frameId, ['success' => false,'op'=>'notice','scene'=>$scene,'data'=>$data, 'message' => '参数错误']);
return false;
}
$msgData = [
'from_uid' => $fromUid,
'to_uid' => $toUid,
'type' => $type,
'msg_type' => 4,
'title' => $message,
'order_no' => isset($order['order_no'])?$order['order_no'] : '',
'description' => isset($order['title'])? $order['title'] : '有新的订单消息',
'content' => json_encode($order,256),
'chat_key' => $chatKey,
'create_time' => time(),
'update_time' => time(),
'is_read' => 2,
'status' => 1
];
if (!$id = MessageModel::insertGetId($msgData)) {
$data = ['success' => false,'op'=>'notice','scene'=>$scene,'data'=>$msgData, 'message' => '消息发送失败'];
$this->sendMsg($frameId, $data);
return false;
}
// 推送消息给对方
$msgData['time_text'] = dateFormat($msgData['create_time']);
$msgData['content'] = $msgData['content']? json_decode($msgData['content'], true) : [];
// 返回自身消息
$this->sendMsg($frameId, ['success' => true, 'op' => 'notice', 'scene'=> $scene,'custom_fd'=>$customFd, 'data' => $msgData, 'message' => '发送成功:' . $frameId]);
// 推送给对方消息
$toBindData = RedisService::get("chats:bind:{$scene}_{$toUid}");
$toFd = isset($toBindData['fd']) ? $toBindData['fd'] : 0;
echo $toFd;
if ($toBindData && $toFd) {
$this->sendMsg($toFd, ['success' => true, 'op' => 'notice' ,'scene'=> $scene,'custom_fd'=>$customFd, 'data' => ['count'=>1,'list'=>[$msgData]], 'message' => '推送消息成功:' . $toFd]);
$this->info("【{$date}】Message:客户端【{$frameId}-{$fromUid}】推送消息给【{$toFd}-{$toUid}。");
}
break;
case 'login': // 登录
if($toUid<=0){
$toUid = 1;
$data['to_uid'] = $toUid;
$data['is_custom'] = true;
$data['to_user_name'] = '客服';
}
// 未读消息
$unreadCount = 0;
if($fromUid==1){
$unreadCount = intval(\App\Services\Api\MessageService::make()->getUnreadCount(1));
}
$fdData = RedisService::get("chats:bind:chat_".$toUid);
$toFd = $fdData && isset($fdData['fd'])? $fdData['fd'] : 0;
$checkFd = RedisService::get("chats:frames:" . $toFd);
$online = $checkFd && $toFd? 1 : 0;
$data['to_fd'] = $online? $toFd : 0;
$data['chat_key'] = getChatKey($fromUid,$toUid);
$this->info("【{$scene} {$date}】Socket:登录成功【{$frameId}-{$fromUid}-{$op}】。");
$this->sendMsg($frameId, ['success' => true,'op'=> $op, 'scene'=>$scene,'custom_fd'=>$customFd,'unread'=>$unreadCount, 'message' => '登录成功', 'data' => $data, 't' => time()]);
break;
default:
$this->sendMsg($frameId, ['success' => false, 'message' => 'ok', 'scene'=>$scene,'custom_fd'=>$customFd, 'data' => $data, 't' => time()]);
break;
}
$this->info("【{$scene} {$date}】Chat:客户端【{$frameId}】消息处理成功");
} catch (\Exception $exception) {
RedisService::set("caches:sockets:error_{$frameId}", ['error' => $exception->getMessage(),'trace'=>$exception->getTrace(), 'date' => $date], 7200);
$this->info("【{$scene} {$date}】Chat:客户端【{$frameId}】消息处理错误 " . $exception->getMessage());
}
}
/**
* 签名验证
* @param $data
* @return bool
*/
public function checkSign($data)
{
$checkSign = isset($data['sign']) ? $data['sign'] : '';
$sign = getSign($data);
if ($sign != $checkSign) {
return false;
}
return true;
}
/**
* 推送消息
* @param $fd
* @param $op
* @param $data
*/
public function sendMsg($fd, $data)
{
$date = date('Y-m-d H:i:s');
try {
if (!RedisService::exists("chats:frames:" . $fd)) {
$this->info("【{$date}】Chat:客户端【{$fd}】推送用户已经掉线 ");
return false;
}
$this->ws->push($fd, json_encode($data, 256));
} catch (\Exception $exception) {
$this->info("【{$date}】Chat:客户端【{$fd}】消息处理错误 " . $exception->getMessage());
}
}
/**
* 接收请求
* @param $request
* @param $response
*/
public function request($request, $response)
{
}
/**
* 关闭连接
* @param $ws
* @param $fd
*/
public function close($ws, $fd = '')
{
$date = date('Y-m-d H:i:s');
RedisService::clear("chats:frames:" . $fd);
$this->info("【{$date}】Chat:客户端【{$fd}】连接关闭");
RedisService::clear('chats:checkCustom:from_'.$fd);
$this->ws->close($fd);
// 清理历史消息
if(!RedisService::get("caches:messages:clear")){
$chatLogCount = ConfigService::make()->getConfigByCode('chat_log_count',300);
$expireTime = ConfigService::make()->getConfigByCode('chat_log_expire',30);
$expireTime = $expireTime>0 && $expireTime <= 365? $expireTime : 30;
$expireTime = $expireTime * 24 * 3600;
if(MessageModel::count('id')>$chatLogCount){
MessageModel::where('create_time','<', time() - $expireTime)->delete();
}
}
}
/**
* 停止运行
*/
public function stop()
{
// 直接杀
$stoSh = base_path().'/crontab/socketStop.sh';
if(file_exists($stoSh) && function_exists('exec')){
exec("{$stoSh}");
}
echo "$stoSh\n";
echo "chat stop success...\n";
if ($this->ws) {
$date = date('Y-m-d H:i:s');
$this->info("【{$date}】Chat:停止运行服务");
$this->ws->close();
}
}
/**
* 消息
* @param string $data
*/
public function info($data,$verbosity=true)
{
\logger()->channel('chat')->info($data);
if(env('SWOOLE_LOG', true) && $verbosity){
parent::info($data);
}
}
}