| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213 |
- <?php
- namespace App\Console\Commands;
- use App\Services\Common\MemberService;
- use App\Services\RedisService;
- use Illuminate\Console\Command;
- class SocketServer extends Command
- {
- public $ws;
- /**
- * The name and signature of the console command.
- *
- * @var string
- */
- protected $signature = 'swoole:socket {op?}';
- /**
- * The console command description.
- *
- * @var string
- */
- protected $description = 'websocket server run';
- /**
- * Create a new command instance.
- *
- * @return void
- */
- public function __construct()
- {
- parent::__construct();
- }
- /**
- * Execute the console command.
- *
- * @return mixed
- */
- public function handle()
- {
- $op = $this->argument('op');
- $op = $op ? $op : 'start';
- if ($op == 'start') {
- echo "swoole socket service start ...\n";
- $this->start();
- } else if ($op == 'stop') {
- echo "swoole socket service stop ...\n";
- $this->stop();
- }
- }
- /**
- * 运行
- */
- public function start()
- {
- try {
- //创建websocket服务器对象,监听0.0.0.0:7104端口
- $this->ws = new \swoole_websocket_server("0.0.0.0", env('SOCKET_PORT', '8650'));
- //监听WebSocket连接打开事件
- $this->ws->on('open', [$this, 'open']);
- //监听WebSocket消息事件
- $this->ws->on('message', [$this, 'message']);
- //监听WebSocket主动推送消息事件
- $this->ws->on('request', [$this, 'request']);
- //监听WebSocket连接关闭事件
- $this->ws->on('close', [$this, 'close']);
- $this->ws->start();
- } catch (\Exception $exception){
- $date = date('Y-m-d H:i:s');
- RedisService::set("caches:sockets:error", $exception->getMessage(), 600);
- $this->info("【{$date}】Socket:运行异常=》".$exception->getMessage());
- }
- }
- /**
- * 建立连接
- * @param $ws
- * @param $request
- */
- public function open($ws, $request)
- {
- $date = date('Y-m-d H:i:s');
- $fds = RedisService::get('caches:sockets:fds');
- if($fds){
- $fds[] = $request->fd;
- }else{
- $fds = [$request->fd];
- }
- RedisService::set('caches:sockets:fds',array_unique($fds), 86400);
- $this->ws->push($request->fd, json_encode(['op'=>'conn','message' => '连接成功','fd'=>$request->fd], 256));
- $this->info("【{$date}】Socket:客户端【{$request->fd}】连接成功");
- }
- /**
- * 接收消息
- * @param $ws
- * @param $frame
- */
- public function message($ws, $frame)
- {
- try {
- $date = date('Y-m-d H:i:s');
- if ($frame->data == 'ping') {
- $this->ws->push($frame->fd, 'pong');
- $this->info("【{$date}】Socket:客户端【{$frame->fd}】心跳包");
- return false;
- }
- // 消息处理
- $data = $frame->data ? json_decode($frame->data, true) : [];
- $fromId = $frame->fd;
- $op = isset($data['op']) ? $data['op'] : '';
- $ct = isset($data['ct'])? $data['ct'] : '';
- $uuid = isset($data['uuid'])? $data['uuid'] : uniqid();
- $fromData = isset($data['data'])? $data['data'] : '';
- switch($op){
- case 'bind':
- RedisService::set("caches:{$uuid}:{$op}", ['uuid'=>$uuid,'qrcode'=>$fromData,'formId'=>$fromId,'date'=>$date], 7 * 24 * 3600);
- break;
- case 'login-bind':
- RedisService::set("caches:binds:login", ['uuid'=>$uuid,'data'=>$fromData,'formId'=>$fromId,'date'=>$date], 7 * 24 * 3600);
- RedisService::set("caches:{$uuid}:{$op}", ['uuid'=>$uuid,'data'=>$fromData,'formId'=>$fromId,'date'=>$date], 7 * 24 * 3600);
- RedisService::set("caches:fds", $this->ws->connections, 600);
- $fds = RedisService::get('caches:sockets:fds');
- if($fds){
- $localData = [];
- foreach ($fromData as $k => $v){
- $localData[] = [
- 'key'=> $k,
- 'value'=> json_decode($v, true)
- ];
- }
- foreach ($fds as $fd) {
- // 需要先判断是否是正确的websocket连接,否则有可能会push失败
- //广播推送
- if ($this->ws->isEstablished($fd)) {
- $this->ws->push($fd, json_encode(['op'=>'login-load','uuid'=>$uuid,'fd'=> $fromId,'data'=> $localData,'t'=>time()], 256));
- $this->info("【{$date}】Socket:客户端【{$fd}】推送广播消息");
- }
- }
- }
- break;
- default:
- RedisService::set("caches:{$uuid}:{$op}", ['uuid'=>$uuid,'data'=>$fromData,'formId'=>$fromId,'date'=>$date], 7 * 24 * 3600);
- break;
- }
- $this->ws->push($fromId,json_encode(['op'=> $op,'message'=>'ok','t'=>time()]));
- $this->info("【{$date}】Socket:客户端【{$fromId}】消息处理成功");
- } catch (\Exception $exception) {
- $this->ws->push($frame->fd, json_encode(['message' => $exception->getMessage()], 256));
- }
- }
- /**
- * 接收请求
- * @param $request
- * @param $response
- */
- public function request($request, $response)
- {
- }
- /**
- * 发送消息
- * @param $fd
- * @param $data
- * @return false
- */
- public function sendMsg($fd, $data)
- {
- try {
- $this->ws->push($fd, $data);
- } catch (\Exception $exception){
- return false;
- }
- }
- /**
- * 关闭连接
- * @param $ws
- * @param $fd
- */
- public function close($ws, $fd = '')
- {
- $date = date('Y-m-d H:i:s');
- RedisService::keyDel("caches:server:".$fd);
- $this->info("【{$date}】Socket:客户端【{$fd}】连接关闭");
- $this->ws->close($fd);
- }
- /**
- * 停止运行
- */
- public function stop()
- {
- if($this->ws){
- $date = date('Y-m-d H:i:s');
- $this->info("【{$date}】Socket:停止运行服务");
- $this->ws->close();
- }
- }
- }
|