SocketServer.php 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. <?php
  2. namespace App\Console\Commands;
  3. use App\Services\Common\MemberService;
  4. use App\Services\RedisService;
  5. use Illuminate\Console\Command;
  6. class SocketServer extends Command
  7. {
  8. public $ws;
  9. /**
  10. * The name and signature of the console command.
  11. *
  12. * @var string
  13. */
  14. protected $signature = 'swoole:socket {op?}';
  15. /**
  16. * The console command description.
  17. *
  18. * @var string
  19. */
  20. protected $description = 'websocket server run';
  21. /**
  22. * Create a new command instance.
  23. *
  24. * @return void
  25. */
  26. public function __construct()
  27. {
  28. parent::__construct();
  29. }
  30. /**
  31. * Execute the console command.
  32. *
  33. * @return mixed
  34. */
  35. public function handle()
  36. {
  37. $op = $this->argument('op');
  38. $op = $op ? $op : 'start';
  39. if ($op == 'start') {
  40. echo "swoole socket service start ...\n";
  41. $this->start();
  42. } else if ($op == 'stop') {
  43. echo "swoole socket service stop ...\n";
  44. $this->stop();
  45. }
  46. }
  47. /**
  48. * 运行
  49. */
  50. public function start()
  51. {
  52. try {
  53. //创建websocket服务器对象,监听0.0.0.0:7104端口
  54. $this->ws = new \swoole_websocket_server("0.0.0.0", env('SOCKET_PORT', '8650'));
  55. //监听WebSocket连接打开事件
  56. $this->ws->on('open', [$this, 'open']);
  57. //监听WebSocket消息事件
  58. $this->ws->on('message', [$this, 'message']);
  59. //监听WebSocket主动推送消息事件
  60. $this->ws->on('request', [$this, 'request']);
  61. //监听WebSocket连接关闭事件
  62. $this->ws->on('close', [$this, 'close']);
  63. $this->ws->start();
  64. } catch (\Exception $exception){
  65. $date = date('Y-m-d H:i:s');
  66. RedisService::set("caches:sockets:error", $exception->getMessage(), 600);
  67. $this->info("【{$date}】Socket:运行异常=》".$exception->getMessage());
  68. }
  69. }
  70. /**
  71. * 建立连接
  72. * @param $ws
  73. * @param $request
  74. */
  75. public function open($ws, $request)
  76. {
  77. $date = date('Y-m-d H:i:s');
  78. $fds = RedisService::get('caches:sockets:fds');
  79. if($fds){
  80. $fds[] = $request->fd;
  81. }else{
  82. $fds = [$request->fd];
  83. }
  84. RedisService::set('caches:sockets:fds',array_unique($fds), 86400);
  85. $this->ws->push($request->fd, json_encode(['op'=>'conn','message' => '连接成功','fd'=>$request->fd], 256));
  86. $this->info("【{$date}】Socket:客户端【{$request->fd}】连接成功");
  87. }
  88. /**
  89. * 接收消息
  90. * @param $ws
  91. * @param $frame
  92. */
  93. public function message($ws, $frame)
  94. {
  95. try {
  96. $date = date('Y-m-d H:i:s');
  97. if ($frame->data == 'ping') {
  98. $this->ws->push($frame->fd, 'pong');
  99. $this->info("【{$date}】Socket:客户端【{$frame->fd}】心跳包");
  100. return false;
  101. }
  102. // 消息处理
  103. $data = $frame->data ? json_decode($frame->data, true) : [];
  104. $fromId = $frame->fd;
  105. $op = isset($data['op']) ? $data['op'] : '';
  106. $ct = isset($data['ct'])? $data['ct'] : '';
  107. $uuid = isset($data['uuid'])? $data['uuid'] : uniqid();
  108. $fromData = isset($data['data'])? $data['data'] : '';
  109. switch($op){
  110. case 'bind':
  111. RedisService::set("caches:{$uuid}:{$op}", ['uuid'=>$uuid,'qrcode'=>$fromData,'formId'=>$fromId,'date'=>$date], 7 * 24 * 3600);
  112. break;
  113. case 'login-bind':
  114. RedisService::set("caches:binds:login", ['uuid'=>$uuid,'data'=>$fromData,'formId'=>$fromId,'date'=>$date], 7 * 24 * 3600);
  115. RedisService::set("caches:{$uuid}:{$op}", ['uuid'=>$uuid,'data'=>$fromData,'formId'=>$fromId,'date'=>$date], 7 * 24 * 3600);
  116. RedisService::set("caches:fds", $this->ws->connections, 600);
  117. $fds = RedisService::get('caches:sockets:fds');
  118. if($fds){
  119. $localData = [];
  120. foreach ($fromData as $k => $v){
  121. $localData[] = [
  122. 'key'=> $k,
  123. 'value'=> json_decode($v, true)
  124. ];
  125. }
  126. foreach ($fds as $fd) {
  127. // 需要先判断是否是正确的websocket连接,否则有可能会push失败
  128. //广播推送
  129. if ($this->ws->isEstablished($fd)) {
  130. $this->ws->push($fd, json_encode(['op'=>'login-load','uuid'=>$uuid,'fd'=> $fromId,'data'=> $localData,'t'=>time()], 256));
  131. $this->info("【{$date}】Socket:客户端【{$fd}】推送广播消息");
  132. }
  133. }
  134. }
  135. break;
  136. default:
  137. RedisService::set("caches:{$uuid}:{$op}", ['uuid'=>$uuid,'data'=>$fromData,'formId'=>$fromId,'date'=>$date], 7 * 24 * 3600);
  138. break;
  139. }
  140. $this->ws->push($fromId,json_encode(['op'=> $op,'message'=>'ok','t'=>time()]));
  141. $this->info("【{$date}】Socket:客户端【{$fromId}】消息处理成功");
  142. } catch (\Exception $exception) {
  143. $this->ws->push($frame->fd, json_encode(['message' => $exception->getMessage()], 256));
  144. }
  145. }
  146. /**
  147. * 接收请求
  148. * @param $request
  149. * @param $response
  150. */
  151. public function request($request, $response)
  152. {
  153. }
  154. /**
  155. * 发送消息
  156. * @param $fd
  157. * @param $data
  158. * @return false
  159. */
  160. public function sendMsg($fd, $data)
  161. {
  162. try {
  163. $this->ws->push($fd, $data);
  164. } catch (\Exception $exception){
  165. return false;
  166. }
  167. }
  168. /**
  169. * 关闭连接
  170. * @param $ws
  171. * @param $fd
  172. */
  173. public function close($ws, $fd = '')
  174. {
  175. $date = date('Y-m-d H:i:s');
  176. RedisService::keyDel("caches:server:".$fd);
  177. $this->info("【{$date}】Socket:客户端【{$fd}】连接关闭");
  178. $this->ws->close($fd);
  179. }
  180. /**
  181. * 停止运行
  182. */
  183. public function stop()
  184. {
  185. if($this->ws){
  186. $date = date('Y-m-d H:i:s');
  187. $this->info("【{$date}】Socket:停止运行服务");
  188. $this->ws->close();
  189. }
  190. }
  191. }