SwooleTask.php 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. <?php
  2. namespace App\Console\Commands;
  3. use App\Services\RedisService;
  4. use Illuminate\Console\Command;
  5. use Illuminate\Support\Facades\DB;
  6. class SwooleTask extends Command
  7. {
  8. protected $serv;
  9. protected $host = '127.0.0.1';
  10. protected $port = 6622;
  11. // 进程名称
  12. protected $taskName = 'swooleTask';
  13. // PID路径
  14. protected $pidPath = '/storage/swoole.pid';
  15. // task
  16. protected $onlyReloadTaskWorker = false;
  17. // 设置运行时参数
  18. protected $options = [
  19. 'worker_num' => 8, //worker进程数,一般设置为CPU数的1-4倍
  20. 'daemonize' => true, //启用守护进程
  21. 'log_file' => '/storage/logs/swoole-task.log', //指定swoole错误日志文件
  22. 'log_level' => 0, //日志级别 范围是0-5,0-DEBUG,1-TRACE,2-INFO,3-NOTICE,4-WARNING,5-ERROR
  23. 'dispatch_mode' => 1, //数据包分发策略,1-轮询模式
  24. 'task_worker_num' => 6, //task进程的数量
  25. 'task_ipc_mode' => 3, //使用消息队列通信,并设置为争抢模式
  26. ];
  27. /**
  28. * The name and signature of the console command.
  29. *
  30. * @var string
  31. */
  32. protected $signature = 'swoole:task {op}';
  33. /**
  34. * The console command description.
  35. *
  36. * @var string
  37. */
  38. protected $description = 'Swoole task server description';
  39. /**
  40. * Create a new command instance.
  41. *
  42. * @return void
  43. */
  44. public function __construct()
  45. {
  46. parent::__construct();
  47. }
  48. /**
  49. * 入口
  50. * Execute the console command.
  51. *
  52. * @return mixed
  53. */
  54. public function handle()
  55. {
  56. ini_set("default_socket_timeout", -1);
  57. // 项目根目录
  58. defined('ROOT_PATH') or define('ROOT_PATH', base_path());
  59. // 文件上传目录
  60. defined('ATTACHMENT_PATH') or define('ATTACHMENT_PATH', base_path('public/uploads'));
  61. // 图片上传目录
  62. defined('IMG_PATH') or define('IMG_PATH', base_path('public/uploads/images'));
  63. // 临时存放目录
  64. defined('UPLOAD_TEMP_PATH') or define('UPLOAD_TEMP_PATH', ATTACHMENT_PATH . "/temp");
  65. // 定义普通图片域名
  66. defined('IMG_URL') or define('IMG_URL', env('IMG_URL'));
  67. // 数据表前缀
  68. defined('DB_PREFIX') or define('DB_PREFIX', DB::connection()->getTablePrefix());
  69. $this->options['log_file'] = base_path() . $this->options['log_file'];
  70. $this->pidPath = base_path() . $this->pidPath;
  71. $op = $this->argument('op');
  72. switch ($op) {
  73. case 'status': // 状态
  74. $res = $this->status();
  75. echo $res ? $res : 0;
  76. break;
  77. case 'start': // 运行
  78. return $this->start();
  79. break;
  80. case 'reload': // 平滑重启
  81. return $this->reload();
  82. break;
  83. case 'stop': // 停止运行
  84. return $this->stop();
  85. break;
  86. default:
  87. exit("{$op} command does not exist");
  88. break;
  89. }
  90. }
  91. /**
  92. * 启动
  93. */
  94. public function start()
  95. {
  96. date_default_timezone_set('PRC');
  97. // 构建Server对象,监听对应地址
  98. $this->serv = new \Swoole\Server($this->host, $this->port);
  99. $this->serv->set($this->options);
  100. // 注册事件
  101. $this->serv->on('start', [$this, 'onStart']);
  102. $this->serv->on('receive', [$this, 'onReceive']);
  103. $this->serv->on('task', [$this, 'onTask']);
  104. $this->serv->on('finish', [$this, 'onFinish']);
  105. // Run worker
  106. echo "swoole start...\n";
  107. $this->serv->start();
  108. }
  109. // 安全重启
  110. public function reload()
  111. {
  112. $pids = file_exists($this->pidPath) ? file_get_contents($this->pidPath) : '';
  113. $pids = $pids ? explode("\n", $pids) : [];
  114. $masterPid = isset($pids[0]) ? $pids[0] : '';
  115. $managePid = isset($pids[1]) ? $pids[1] : '';
  116. if (empty($masterPid)) {
  117. return false;
  118. }
  119. if (!$this->status($masterPid)) {
  120. return false;
  121. }
  122. \Swoole\Process::kill($managePid, SIGUSR1);
  123. echo "swoole reload...\n";
  124. }
  125. /**
  126. * 停止
  127. * @param bool $smooth
  128. * @return bool
  129. */
  130. public function stop($smooth = false)
  131. {
  132. $pids = file_exists($this->pidPath) ? file_get_contents($this->pidPath) : '';
  133. $pids = $pids ? explode("\n", $pids) : [];
  134. $masterPid = isset($pids[0]) ? $pids[0] : '';
  135. $managePid = isset($pids[1]) ? $pids[1] : '';
  136. if (empty($masterPid)) {
  137. return false;
  138. }
  139. if (!$this->status($masterPid)) {
  140. return false;
  141. }
  142. // if ($smooth) {
  143. // \Swoole\Process::kill($masterPid, SIGTERM);
  144. // try {
  145. // while (true) {
  146. // \Swoole\Process::kill($masterPid, 0);
  147. // }
  148. // } catch (\Exception $exception) {
  149. //
  150. // }
  151. // } else {
  152. // \Swoole\Process::kill($masterPid, SIGKILL);
  153. // }
  154. //
  155. // if($managePid){
  156. // \Swoole\Process::kill($managePid, SIGKILL);
  157. // }
  158. // 直接杀
  159. $stoSh = base_path().'/crontab/swooleTaskStop.sh';
  160. echo $stoSh;
  161. if(file_exists($stoSh) && function_exists('exec')){
  162. exec("{$stoSh}");
  163. }
  164. @unlink($this->pidPath);
  165. echo "swoole stop...\n";
  166. }
  167. /**
  168. * 状态
  169. * @return mixed
  170. */
  171. public function status($masterPid = 0)
  172. {
  173. $res = false;
  174. if (empty($masterPid) && file_exists($this->pidPath)) {
  175. $pids = file_get_contents($this->pidPath);
  176. $pids = $pids ? explode("\n", $pids) : [];
  177. $masterPid = isset($pids[0]) ? $pids[0] : '';
  178. }
  179. if ($masterPid) {
  180. $res = \Swoole\Process::kill($masterPid, 0);
  181. }
  182. return $res;
  183. }
  184. public function onStart($serv)
  185. {
  186. if (!is_dir(dirname($this->pidPath))) {
  187. @mkdir(dirname($this->pidPath), true, 755);
  188. }
  189. //记录进程id,脚本实现自动重启
  190. $pid = "{$serv->master_pid}\n{$serv->manager_pid}";
  191. file_put_contents($this->pidPath, $pid);
  192. // 定时任务
  193. $time = 0;
  194. $date = date('Y-m-d H:i:s');
  195. if(file_exists($this->options['log_file'])){
  196. $time = 0;
  197. file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n");
  198. }
  199. \swoole_timer_tick(300000, function ($timer) use ($serv, &$time) { // 启用定时器,每300秒执行一次
  200. $date = date('Y-m-d H:i:s');
  201. if($time>3600 && file_exists($this->options['log_file'])){
  202. $time = 0;
  203. file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n");
  204. }
  205. $time++;
  206. if(!RedisService::get('caches:task:lock:music_loaded')){
  207. $taskData = [
  208. 'taskName' => 'bgmMusic',
  209. 'name' => "背景音乐写表处理",
  210. 'date' => date('Y-m-d'),
  211. ];
  212. $res = $serv->task($taskData);
  213. RedisService::set('caches:task:lock:music_loaded', true, rand(3,5));
  214. echo "[Task bgmMusic {$date}] 背景音乐写表处理:{$res}\n";
  215. }else{
  216. echo "[Task bgmMusic {$date}] 间隔时间调用\n";
  217. }
  218. });
  219. }
  220. //监听连接进入事件
  221. public function onConnect($serv, $fd, $from_id)
  222. {
  223. $serv->send($fd, "Success {$fd}!");
  224. }
  225. // 监听数据接收事件
  226. public function onReceive(\Swoole\Server $serv, $fd, $from_id, $data)
  227. {
  228. echo "Get Message From Client {$fd}:{$data}\n";
  229. $res['result'] = 'success';
  230. $serv->send($fd, json_encode($res)); // 同步返回消息给客户端
  231. $serv->task($data); // 执行异步任务
  232. }
  233. /**
  234. * @param \Swoole\Server $serv
  235. * @param $task_id
  236. * @param $from_id
  237. * @param $data
  238. * @return false|string
  239. */
  240. public function onTask(\Swoole\Server $serv, $task_id, $from_id, $data)
  241. {
  242. $date = date('Y-m-d H:i:s');
  243. $taskName = isset($data['taskName']) ? $data['taskName'] : '';
  244. switch ($taskName) {
  245. case 'OrderSettle': // 订单结算
  246. // 时间限制
  247. if(date('H:i') >= env('SETTLE_TIME','06:00')){
  248. echo "[Task {$taskName} {$date}] 不在运行时间段内\n";
  249. return false;
  250. }
  251. // 调用处理
  252. if($res = WalletService::make()->orderSettle()){
  253. $res = is_array($res) && $res? json_encode($res, 256) : 'success';
  254. echo "[Task {$taskName} {$date}] 订单结算结果:{$res}\n";
  255. }else{
  256. $error = BonusService::make()->getError();
  257. $error = $error? lang($error) : 'failed';
  258. echo "[Task {$taskName} {$date}] 订单结算结果:{$error}\n";
  259. }
  260. break;
  261. case 'bgmMusic':
  262. break;
  263. }
  264. return '暂无任务处理';
  265. }
  266. /**
  267. * @param $serv swoole_server swoole_server对象
  268. * @param $task_id int 任务id
  269. * @param $data string 任务返回的数据
  270. */
  271. public function onFinish(\Swoole\Server $serv, $task_id, $data)
  272. {
  273. //
  274. echo "任务处理完成...\n";
  275. }
  276. // 监听连接关闭事件
  277. public function onClose($serv, $fd, $from_id)
  278. {
  279. echo "Client {$fd} close connection\n";
  280. $serv->close();
  281. }
  282. }