SwooleTask.php 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489
  1. <?php
  2. namespace App\Console\Commands;
  3. use App\Services\Api\FinanceService;
  4. use App\Services\Api\GoodsService;
  5. use App\Services\RedisService;
  6. use Illuminate\Console\Command;
  7. use Illuminate\Support\Facades\DB;
  8. class SwooleTask extends Command
  9. {
  10. protected $serv;
  11. protected $host = '127.0.0.1';
  12. protected $port = 6622;
  13. // 进程名称
  14. protected $taskName = 'swooleTask';
  15. // PID路径
  16. protected $pidPath = '/storage/swoole.pid';
  17. // task
  18. protected $onlyReloadTaskWorker = false;
  19. // 设置运行时参数
  20. protected $options = [
  21. 'worker_num' => 8, //worker进程数,一般设置为CPU数的1-4倍
  22. 'daemonize' => true, //启用守护进程
  23. 'log_file' => '/storage/logs/swoole-task.log', //指定swoole错误日志文件
  24. 'log_level' => 0, //日志级别 范围是0-5,0-DEBUG,1-TRACE,2-INFO,3-NOTICE,4-WARNING,5-ERROR
  25. 'dispatch_mode' => 1, //数据包分发策略,1-轮询模式
  26. 'task_worker_num' => 6, //task进程的数量
  27. 'task_ipc_mode' => 3, //使用消息队列通信,并设置为争抢模式
  28. ];
  29. /**
  30. * The name and signature of the console command.
  31. *
  32. * @var string
  33. */
  34. protected $signature = 'swoole:task {op}';
  35. /**
  36. * The console command description.
  37. *
  38. * @var string
  39. */
  40. protected $description = 'Swoole task server description';
  41. /**
  42. * Create a new command instance.
  43. *
  44. * @return void
  45. */
  46. public function __construct()
  47. {
  48. parent::__construct();
  49. }
  50. /**
  51. * 入口
  52. * Execute the console command.
  53. *
  54. * @return mixed
  55. */
  56. public function handle()
  57. {
  58. ini_set("default_socket_timeout", -1);
  59. // 项目根目录
  60. defined('ROOT_PATH') or define('ROOT_PATH', base_path());
  61. // 文件上传目录
  62. defined('ATTACHMENT_PATH') or define('ATTACHMENT_PATH', base_path('public/uploads'));
  63. // 图片上传目录
  64. defined('IMG_PATH') or define('IMG_PATH', base_path('public/uploads/images'));
  65. // 临时存放目录
  66. defined('UPLOAD_TEMP_PATH') or define('UPLOAD_TEMP_PATH', ATTACHMENT_PATH . "/temp");
  67. // 定义普通图片域名
  68. defined('IMG_URL') or define('IMG_URL', env('IMG_URL'));
  69. // 数据表前缀
  70. defined('DB_PREFIX') or define('DB_PREFIX', DB::connection()->getTablePrefix());
  71. $this->options['log_file'] = base_path() . $this->options['log_file'];
  72. $this->pidPath = base_path() . $this->pidPath;
  73. $op = $this->argument('op');
  74. switch ($op) {
  75. case 'status': // 状态
  76. $res = $this->status();
  77. echo $res ? $res : 0;
  78. break;
  79. case 'start': // 运行
  80. return $this->start();
  81. break;
  82. case 'reload': // 平滑重启
  83. return $this->reload();
  84. break;
  85. case 'stop': // 停止运行
  86. return $this->stop();
  87. break;
  88. default:
  89. exit("{$op} command does not exist");
  90. break;
  91. }
  92. }
  93. /**
  94. * 启动
  95. */
  96. public function start()
  97. {
  98. date_default_timezone_set('PRC');
  99. // 构建Server对象,监听对应地址
  100. $this->serv = new \Swoole\Server($this->host, $this->port);
  101. $this->serv->set($this->options);
  102. // 注册事件
  103. $this->serv->on('start', [$this, 'onStart']);
  104. $this->serv->on('receive', [$this, 'onReceive']);
  105. $this->serv->on('task', [$this, 'onTask']);
  106. $this->serv->on('finish', [$this, 'onFinish']);
  107. // Run worker
  108. echo "swoole start...\n";
  109. $this->serv->start();
  110. }
  111. // 安全重启
  112. public function reload()
  113. {
  114. $pids = file_exists($this->pidPath) ? file_get_contents($this->pidPath) : '';
  115. $pids = $pids ? explode("\n", $pids) : [];
  116. $masterPid = isset($pids[0]) ? $pids[0] : '';
  117. $managePid = isset($pids[1]) ? $pids[1] : '';
  118. if (empty($masterPid)) {
  119. return false;
  120. }
  121. if (!$this->status($masterPid)) {
  122. return false;
  123. }
  124. \Swoole\Process::kill($managePid, SIGUSR1);
  125. echo "swoole reload...\n";
  126. }
  127. /**
  128. * 停止
  129. * @param bool $smooth
  130. * @return bool
  131. */
  132. public function stop($smooth = false)
  133. {
  134. $pids = file_exists($this->pidPath) ? file_get_contents($this->pidPath) : '';
  135. $pids = $pids ? explode("\n", $pids) : [];
  136. $masterPid = isset($pids[0]) ? $pids[0] : '';
  137. $managePid = isset($pids[1]) ? $pids[1] : '';
  138. if (empty($masterPid)) {
  139. return false;
  140. }
  141. if (!$this->status($masterPid)) {
  142. return false;
  143. }
  144. // if ($smooth) {
  145. // \Swoole\Process::kill($masterPid, SIGTERM);
  146. // try {
  147. // while (true) {
  148. // \Swoole\Process::kill($masterPid, 0);
  149. // }
  150. // } catch (\Exception $exception) {
  151. //
  152. // }
  153. // } else {
  154. // \Swoole\Process::kill($masterPid, SIGKILL);
  155. // }
  156. //
  157. // if($managePid){
  158. // \Swoole\Process::kill($managePid, SIGKILL);
  159. // }
  160. // 直接杀
  161. $stoSh = base_path().'/crontab/swooleTaskStop.sh';
  162. echo $stoSh;
  163. if(file_exists($stoSh) && function_exists('exec')){
  164. exec("{$stoSh}");
  165. }
  166. @unlink($this->pidPath);
  167. echo "swoole stop...\n";
  168. }
  169. /**
  170. * 状态
  171. * @return mixed
  172. */
  173. public function status($masterPid = 0)
  174. {
  175. $res = false;
  176. if (empty($masterPid) && file_exists($this->pidPath)) {
  177. $pids = file_get_contents($this->pidPath);
  178. $pids = $pids ? explode("\n", $pids) : [];
  179. $masterPid = isset($pids[0]) ? $pids[0] : '';
  180. }
  181. if ($masterPid) {
  182. $res = \Swoole\Process::kill($masterPid, 0);
  183. }
  184. return $res;
  185. }
  186. public function onStart($serv)
  187. {
  188. if (!is_dir(dirname($this->pidPath))) {
  189. @mkdir(dirname($this->pidPath), true, 755);
  190. }
  191. //记录进程id,脚本实现自动重启
  192. $pid = "{$serv->master_pid}\n{$serv->manager_pid}";
  193. file_put_contents($this->pidPath, $pid);
  194. // 定时任务
  195. $time = 0;
  196. $date = date('Y-m-d H:i:s');
  197. if(file_exists($this->options['log_file'])){
  198. $time = 0;
  199. file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n");
  200. }
  201. // TODO 更新商品数据
  202. \swoole_timer_tick(120000, function ($timer) use ($serv, &$time) { // 启用定时器,每120秒执行一次
  203. $date = date('Y-m-d H:i:s');
  204. if($time>3600 && file_exists($this->options['log_file'])){
  205. $time = 0;
  206. file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n");
  207. }
  208. $time++;
  209. if(!RedisService::get('caches:task:lock:goods_loaded')){
  210. $taskData = [
  211. 'taskName' => 'UpdateGoods',
  212. 'name' => "更新商品数据",
  213. 'date' => date('Y-m-d'),
  214. ];
  215. $res = $serv->task($taskData);
  216. RedisService::set('caches:task:lock:goods_loaded', true, rand(3,5));
  217. echo "[Task UpdateGoods {$date}] 更新商品数据:{$res}\n";
  218. }else{
  219. echo "[Task UpdateGoods {$date}] 间隔时间调用\n";
  220. }
  221. });
  222. // TODO 更新商品分类
  223. \swoole_timer_tick(120000, function ($timer) use ($serv, &$time) { // 启用定时器,每120秒执行一次
  224. $date = date('Y-m-d H:i:s');
  225. if($time>3600 && file_exists($this->options['log_file'])){
  226. $time = 0;
  227. file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n");
  228. }
  229. $time++;
  230. if(!RedisService::get('caches:task:lock:goods_category_loaded')){
  231. $taskData = [
  232. 'taskName' => 'UpdateGoodsCategory',
  233. 'name' => "更新商品分类数据",
  234. 'date' => date('Y-m-d'),
  235. ];
  236. $res = $serv->task($taskData);
  237. RedisService::set('caches:task:lock:goods_category_loaded', true, rand(3,5));
  238. echo "[Task UpdateGoodsCategory {$date}] 更新商品分类数据:{$res}\n";
  239. }else{
  240. echo "[Task UpdateGoodsCategory {$date}] 间隔时间调用\n";
  241. }
  242. });
  243. // TODO 更新商品SKU数据
  244. \swoole_timer_tick(120000, function ($timer) use ($serv, &$time) { // 启用定时器,每120秒执行一次
  245. $date = date('Y-m-d H:i:s');
  246. if($time>3600 && file_exists($this->options['log_file'])){
  247. $time = 0;
  248. file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n");
  249. }
  250. $time++;
  251. if(!RedisService::get('caches:task:lock:goods_sku_loaded')){
  252. $taskData = [
  253. 'taskName' => 'UpdateGoodsSku',
  254. 'name' => "更新商品SKU数据",
  255. 'date' => date('Y-m-d'),
  256. ];
  257. $res = $serv->task($taskData);
  258. RedisService::set('caches:task:lock:goods_sku_loaded', true, rand(3,5));
  259. echo "[Task UpdateGoodsSku {$date}] 更新商品SKU数据:{$res}\n";
  260. }else{
  261. echo "[Task UpdateGoodsSku {$date}] 间隔时间调用\n";
  262. }
  263. });
  264. // TODO 发放积分
  265. \swoole_timer_tick(300000, function ($timer) use ($serv, &$time) { // 启用定时器,每1200秒执行一次
  266. $date = date('Y-m-d H:i:s');
  267. if($time>3600 && file_exists($this->options['log_file'])){
  268. $time = 0;
  269. file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n");
  270. }
  271. $time++;
  272. if(!RedisService::get('caches:task:lock:grant_score_loaded')){
  273. $taskData = [
  274. 'taskName' => 'GrantScore',
  275. 'name' => "每日发放积分",
  276. 'date' => date('Y-m-d'),
  277. ];
  278. $res = $serv->task($taskData);
  279. RedisService::set('caches:task:lock:grant_score_loaded', true, rand(3,5));
  280. echo "[Task GrantScore {$date}] 每日发放积分:{$res}\n";
  281. }else{
  282. echo "[Task GrantScore {$date}] 间隔时间调用\n";
  283. }
  284. });
  285. // TODO 待返积分返还
  286. \swoole_timer_tick(300000, function ($timer) use ($serv, &$time) { // 启用定时器,每1200秒执行一次
  287. $date = date('Y-m-d H:i:s');
  288. if($time>3600 && file_exists($this->options['log_file'])){
  289. $time = 0;
  290. file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n");
  291. }
  292. $time++;
  293. if(!RedisService::get('caches:task:lock:waitt_score_loaded')){
  294. $taskData = [
  295. 'taskName' => 'ReturnWaitScore',
  296. 'name' => "待返积分每日返还",
  297. 'date' => date('Y-m-d'),
  298. ];
  299. $res = $serv->task($taskData);
  300. RedisService::set('caches:task:lock:waitt_score_loaded', true, rand(3,5));
  301. echo "[Task ReturnWaitScore {$date}] 待返积分每日返还:{$res}\n";
  302. }else{
  303. echo "[Task ReturnWaitScore {$date}] 间隔时间调用\n";
  304. }
  305. });
  306. }
  307. //监听连接进入事件
  308. public function onConnect($serv, $fd, $from_id)
  309. {
  310. $serv->send($fd, "Success {$fd}!");
  311. }
  312. // 监听数据接收事件
  313. public function onReceive(\Swoole\Server $serv, $fd, $from_id, $data)
  314. {
  315. echo "Get Message From Client {$fd}:{$data}\n";
  316. $res['result'] = 'success';
  317. $serv->send($fd, json_encode($res)); // 同步返回消息给客户端
  318. $serv->task($data); // 执行异步任务
  319. }
  320. /**
  321. * @param \Swoole\Server $serv
  322. * @param $task_id
  323. * @param $from_id
  324. * @param $data
  325. * @return false|string
  326. */
  327. public function onTask(\Swoole\Server $serv, $task_id, $from_id, $data)
  328. {
  329. $date = date('Y-m-d H:i:s');
  330. $taskName = isset($data['taskName']) ? $data['taskName'] : '';
  331. switch ($taskName) {
  332. case 'UpdateGoods': // 更新商品
  333. // 时间限制
  334. if(date('H:i') >= '00:00' && date('H:i') <= '03:00'){
  335. echo "[Task {$taskName} {$date}] 不在运行时间段内\n";
  336. return false;
  337. }
  338. // 调用处理
  339. if($res = GoodsService::make()->updateGoods()){
  340. $res = is_array($res) && $res? json_encode($res, 256) : 'success';
  341. echo "[Task {$taskName} {$date}] 商品数据获取更新结果:{$res}\n";
  342. }else{
  343. $error = GoodsService::make()->getError();
  344. $error = $error? lang($error) : 'failed';
  345. echo "[Task {$taskName} {$date}] 商品数据获取更新结果:{$error}\n";
  346. }
  347. break;
  348. case 'UpdateGoodsSku': // 更新商品SKu数据
  349. // 时间限制
  350. if(date('H:i') >= '00:00' && date('H:i') <= '03:00'){
  351. echo "[Task {$taskName} {$date}] 不在运行时间段内\n";
  352. return false;
  353. }
  354. // 调用处理
  355. if($res = GoodsService::make()->updateGoodsSku()){
  356. $res = is_array($res) && $res? json_encode($res, 256) : 'success';
  357. echo "[Task {$taskName} {$date}] 更新商品SKu数据结果:{$res}\n";
  358. }else{
  359. $error = GoodsService::make()->getError();
  360. $error = $error? lang($error) : 'failed';
  361. echo "[Task {$taskName} {$date}] 更新商品SKu数据结果:{$error}\n";
  362. }
  363. break;
  364. case 'UpdateGoodsCategory': // 更新商品分类数据
  365. // 时间限制
  366. if(date('H:i') >= '00:00' && date('H:i') <= '04:00'){
  367. echo "[Task {$taskName} {$date}] 不在运行时间段内\n";
  368. return false;
  369. }
  370. // 调用处理
  371. if($res = GoodsService::make()->updateGoodsCategory()){
  372. $res = is_array($res) && $res? json_encode($res, 256) : 'success';
  373. echo "[Task {$taskName} {$date}] 更新商品分类数据结果:{$res}\n";
  374. }else{
  375. $error = GoodsService::make()->getError();
  376. $error = $error? lang($error) : 'failed';
  377. echo "[Task {$taskName} {$date}] 更新商品分类数据结果:{$error}\n";
  378. }
  379. break;
  380. case 'GrantScore':
  381. // 时间限制
  382. if(date('H:i') >= '09:00'){
  383. echo "[Task {$taskName} {$date}] 不在运行时间段内\n";
  384. return false;
  385. }
  386. // 调用处理
  387. if($res = FinanceService::make()->grantScore()){
  388. $res = is_array($res) && $res? json_encode($res, 256) : 'success';
  389. echo "[Task {$taskName} {$date}] 每日发放积分结果:{$res}\n";
  390. }else{
  391. $error = FinanceService::make()->getError();
  392. $error = $error? lang($error) : 'failed';
  393. echo "[Task {$taskName} {$date}] 每日发放积分结果:{$error}\n";
  394. }
  395. break;
  396. case 'ReturnWaitScore':
  397. // 时间限制
  398. if(date('H:i') >= '10:00'){
  399. echo "[Task {$taskName} {$date}] 不在运行时间段内\n";
  400. return false;
  401. }
  402. // 调用处理
  403. if($res = FinanceService::make()->returnWaitScore()){
  404. $res = is_array($res) && $res? json_encode($res, 256) : 'success';
  405. echo "[Task {$taskName} {$date}] 待返积分每日返还结果:{$res}\n";
  406. }else{
  407. $error = FinanceService::make()->getError();
  408. $error = $error? lang($error) : 'failed';
  409. echo "[Task {$taskName} {$date}] 待返积分每日返还结果:{$error}\n";
  410. }
  411. break;
  412. }
  413. return '暂无任务处理';
  414. }
  415. /**
  416. * @param $serv swoole_server swoole_server对象
  417. * @param $task_id int 任务id
  418. * @param $data string 任务返回的数据
  419. */
  420. public function onFinish(\Swoole\Server $serv, $task_id, $data)
  421. {
  422. //
  423. echo "任务处理完成...\n";
  424. }
  425. // 监听连接关闭事件
  426. public function onClose($serv, $fd, $from_id)
  427. {
  428. echo "Client {$fd} close connection\n";
  429. $serv->close();
  430. }
  431. }