SwooleTask.php 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580
  1. <?php
  2. namespace App\Console\Commands;
  3. use App\Services\Api\FinanceService;
  4. use App\Services\Api\GoodsCategoryService;
  5. use App\Services\Api\GoodsService;
  6. use App\Services\RedisService;
  7. use App\Services\WalletService;
  8. use Illuminate\Console\Command;
  9. use Illuminate\Support\Facades\DB;
  10. class SwooleTask extends Command
  11. {
  12. protected $serv;
  13. protected $host = '127.0.0.1';
  14. protected $port = 6622;
  15. // 进程名称
  16. protected $taskName = 'swooleTask';
  17. // PID路径
  18. protected $pidPath = '/storage/swoole.pid';
  19. // task
  20. protected $onlyReloadTaskWorker = false;
  21. // 设置运行时参数
  22. protected $options = [
  23. 'worker_num' => 8, //worker进程数,一般设置为CPU数的1-4倍
  24. 'daemonize' => true, //启用守护进程
  25. 'log_file' => '/storage/logs/swoole-task.log', //指定swoole错误日志文件
  26. 'log_level' => 0, //日志级别 范围是0-5,0-DEBUG,1-TRACE,2-INFO,3-NOTICE,4-WARNING,5-ERROR
  27. 'dispatch_mode' => 1, //数据包分发策略,1-轮询模式
  28. 'task_worker_num' => 6, //task进程的数量
  29. 'task_ipc_mode' => 3, //使用消息队列通信,并设置为争抢模式
  30. ];
  31. /**
  32. * The name and signature of the console command.
  33. *
  34. * @var string
  35. */
  36. protected $signature = 'swoole:task {op}';
  37. /**
  38. * The console command description.
  39. *
  40. * @var string
  41. */
  42. protected $description = 'Swoole task server description';
  43. /**
  44. * Create a new command instance.
  45. *
  46. * @return void
  47. */
  48. public function __construct()
  49. {
  50. parent::__construct();
  51. }
  52. /**
  53. * 入口
  54. * Execute the console command.
  55. *
  56. * @return mixed
  57. */
  58. public function handle()
  59. {
  60. ini_set("default_socket_timeout", -1);
  61. // 项目根目录
  62. defined('ROOT_PATH') or define('ROOT_PATH', base_path());
  63. // 文件上传目录
  64. defined('ATTACHMENT_PATH') or define('ATTACHMENT_PATH', base_path('public/uploads'));
  65. // 图片上传目录
  66. defined('IMG_PATH') or define('IMG_PATH', base_path('public/uploads/images'));
  67. // 临时存放目录
  68. defined('UPLOAD_TEMP_PATH') or define('UPLOAD_TEMP_PATH', ATTACHMENT_PATH . "/temp");
  69. // 定义普通图片域名
  70. defined('IMG_URL') or define('IMG_URL', env('IMG_URL'));
  71. // 数据表前缀
  72. defined('DB_PREFIX') or define('DB_PREFIX', DB::connection()->getTablePrefix());
  73. $this->options['log_file'] = base_path() . $this->options['log_file'];
  74. $this->pidPath = base_path() . $this->pidPath;
  75. $op = $this->argument('op');
  76. switch ($op) {
  77. case 'status': // 状态
  78. $res = $this->status();
  79. echo $res ? $res : 0;
  80. break;
  81. case 'start': // 运行
  82. return $this->start();
  83. break;
  84. case 'reload': // 平滑重启
  85. return $this->reload();
  86. break;
  87. case 'stop': // 停止运行
  88. return $this->stop();
  89. break;
  90. default:
  91. exit("{$op} command does not exist");
  92. break;
  93. }
  94. }
  95. /**
  96. * 启动
  97. */
  98. public function start()
  99. {
  100. date_default_timezone_set('PRC');
  101. // 构建Server对象,监听对应地址
  102. $this->serv = new \Swoole\Server($this->host, $this->port);
  103. $this->serv->set($this->options);
  104. // 注册事件
  105. $this->serv->on('start', [$this, 'onStart']);
  106. $this->serv->on('receive', [$this, 'onReceive']);
  107. $this->serv->on('task', [$this, 'onTask']);
  108. $this->serv->on('finish', [$this, 'onFinish']);
  109. // Run worker
  110. echo "swoole start...\n";
  111. $this->serv->start();
  112. }
  113. // 安全重启
  114. public function reload()
  115. {
  116. $pids = file_exists($this->pidPath) ? file_get_contents($this->pidPath) : '';
  117. $pids = $pids ? explode("\n", $pids) : [];
  118. $masterPid = isset($pids[0]) ? $pids[0] : '';
  119. $managePid = isset($pids[1]) ? $pids[1] : '';
  120. if (empty($masterPid)) {
  121. return false;
  122. }
  123. if (!$this->status($masterPid)) {
  124. return false;
  125. }
  126. \Swoole\Process::kill($managePid, SIGUSR1);
  127. echo "swoole reload...\n";
  128. }
  129. /**
  130. * 停止
  131. * @param bool $smooth
  132. * @return bool
  133. */
  134. public function stop($smooth = false)
  135. {
  136. $pids = file_exists($this->pidPath) ? file_get_contents($this->pidPath) : '';
  137. $pids = $pids ? explode("\n", $pids) : [];
  138. $masterPid = isset($pids[0]) ? $pids[0] : '';
  139. $managePid = isset($pids[1]) ? $pids[1] : '';
  140. if (empty($masterPid)) {
  141. return false;
  142. }
  143. if (!$this->status($masterPid)) {
  144. return false;
  145. }
  146. // if ($smooth) {
  147. // \Swoole\Process::kill($masterPid, SIGTERM);
  148. // try {
  149. // while (true) {
  150. // \Swoole\Process::kill($masterPid, 0);
  151. // }
  152. // } catch (\Exception $exception) {
  153. //
  154. // }
  155. // } else {
  156. // \Swoole\Process::kill($masterPid, SIGKILL);
  157. // }
  158. //
  159. // if($managePid){
  160. // \Swoole\Process::kill($managePid, SIGKILL);
  161. // }
  162. // 直接杀
  163. $stoSh = base_path().'/crontab/swooleTaskStop.sh';
  164. echo $stoSh;
  165. if(file_exists($stoSh) && function_exists('exec')){
  166. exec("{$stoSh}");
  167. }
  168. @unlink($this->pidPath);
  169. echo "swoole stop...\n";
  170. }
  171. /**
  172. * 状态
  173. * @return mixed
  174. */
  175. public function status($masterPid = 0)
  176. {
  177. $res = false;
  178. if (empty($masterPid) && file_exists($this->pidPath)) {
  179. $pids = file_get_contents($this->pidPath);
  180. $pids = $pids ? explode("\n", $pids) : [];
  181. $masterPid = isset($pids[0]) ? $pids[0] : '';
  182. }
  183. if ($masterPid) {
  184. $res = \Swoole\Process::kill($masterPid, 0);
  185. }
  186. return $res;
  187. }
  188. public function onStart($serv)
  189. {
  190. if (!is_dir(dirname($this->pidPath))) {
  191. @mkdir(dirname($this->pidPath), true, 755);
  192. }
  193. //记录进程id,脚本实现自动重启
  194. $pid = "{$serv->master_pid}\n{$serv->manager_pid}";
  195. file_put_contents($this->pidPath, $pid);
  196. // 定时任务
  197. $time = 0;
  198. $date = date('Y-m-d H:i:s');
  199. if(file_exists($this->options['log_file'])){
  200. $time = 0;
  201. file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n");
  202. }
  203. // TODO 更新USDT价格
  204. \swoole_timer_tick(30000, function ($timer) use ($serv, &$time) { // 启用定时器,每120秒执行一次
  205. $date = date('Y-m-d H:i:s');
  206. if($time>3600 && file_exists($this->options['log_file'])){
  207. $time = 0;
  208. file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n");
  209. }
  210. $time++;
  211. if(!RedisService::get('caches:task:lock:usdt_loaded')){
  212. $taskData = [
  213. 'taskName' => 'UpdateUsdtPrice',
  214. 'name' => "更新USDT价格",
  215. 'date' => date('Y-m-d'),
  216. ];
  217. $res = $serv->task($taskData);
  218. RedisService::set('caches:task:lock:usdt_loaded', true, rand(3,5));
  219. echo "[Task UpdateUsdtPrice {$date}] 更新USDT价格:{$res}\n";
  220. }else{
  221. echo "[Task UpdateUsdtPrice {$date}] 间隔时间调用\n";
  222. }
  223. });
  224. // TODO 更新商品数据
  225. \swoole_timer_tick(120000, function ($timer) use ($serv, &$time) { // 启用定时器,每120秒执行一次
  226. $date = date('Y-m-d H:i:s');
  227. if($time>3600 && file_exists($this->options['log_file'])){
  228. $time = 0;
  229. file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n");
  230. }
  231. $time++;
  232. if(!RedisService::get('caches:task:lock:goods_loaded')){
  233. $taskData = [
  234. 'taskName' => 'UpdateGoods',
  235. 'name' => "更新商品数据",
  236. 'date' => date('Y-m-d'),
  237. ];
  238. $res = $serv->task($taskData);
  239. RedisService::set('caches:task:lock:goods_loaded', true, rand(3,5));
  240. echo "[Task UpdateGoods {$date}] 更新商品数据:{$res}\n";
  241. }else{
  242. echo "[Task UpdateGoods {$date}] 间隔时间调用\n";
  243. }
  244. });
  245. // TODO 更新商品分类
  246. \swoole_timer_tick(120000, function ($timer) use ($serv, &$time) { // 启用定时器,每120秒执行一次
  247. $date = date('Y-m-d H:i:s');
  248. if($time>3600 && file_exists($this->options['log_file'])){
  249. $time = 0;
  250. file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n");
  251. }
  252. $time++;
  253. if(!RedisService::get('caches:task:lock:goods_category_loaded')){
  254. $taskData = [
  255. 'taskName' => 'UpdateGoodsCategory',
  256. 'name' => "更新商品分类数据",
  257. 'date' => date('Y-m-d'),
  258. ];
  259. $res = $serv->task($taskData);
  260. RedisService::set('caches:task:lock:goods_category_loaded', true, rand(3,5));
  261. echo "[Task UpdateGoodsCategory {$date}] 更新商品分类数据:{$res}\n";
  262. }else{
  263. echo "[Task UpdateGoodsCategory {$date}] 间隔时间调用\n";
  264. }
  265. });
  266. // TODO 更新商品分类
  267. \swoole_timer_tick(120000, function ($timer) use ($serv, &$time) { // 启用定时器,每120秒执行一次
  268. $date = date('Y-m-d H:i:s');
  269. if($time>3600 && file_exists($this->options['log_file'])){
  270. $time = 0;
  271. file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n");
  272. }
  273. $time++;
  274. $cateIds = GoodsCategoryService::make()->getCateIds();
  275. if($cateIds){
  276. foreach($cateIds as $item){
  277. $pid = isset($item['cate_id'])? $item['cate_id'] : 0;
  278. if($pid && !RedisService::get("caches:task:lock:goods_category_sub_loaded_{$pid}")){
  279. $taskData = [
  280. 'taskName' => 'UpdateGoodsCategorySub',
  281. 'name' => "更新商品分类【{$pid}】的子分类数据",
  282. 'pid'=> $pid,
  283. 'date' => date('Y-m-d'),
  284. ];
  285. $res = $serv->task($taskData);
  286. RedisService::set("caches:task:lock:goods_category_sub_loaded_{$pid}", true, rand(3,5));
  287. echo "[Task UpdateGoodsCategorySub {$date}] 更新商品分类【{$pid}】的子分类数据:{$res}\n";
  288. }else{
  289. echo "[Task UpdateGoodsCategorySub {$date}] 间隔时间调用\n";
  290. }
  291. }
  292. }else{
  293. echo "[Task UpdateGoodsCategorySub {$date}] 没有父级数据\n";
  294. }
  295. });
  296. // TODO 更新商品SKU数据
  297. \swoole_timer_tick(120000, function ($timer) use ($serv, &$time) { // 启用定时器,每120秒执行一次
  298. $date = date('Y-m-d H:i:s');
  299. if($time>3600 && file_exists($this->options['log_file'])){
  300. $time = 0;
  301. file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n");
  302. }
  303. $time++;
  304. if(!RedisService::get('caches:task:lock:goods_sku_loaded')){
  305. $taskData = [
  306. 'taskName' => 'UpdateGoodsSku',
  307. 'name' => "更新商品SKU数据",
  308. 'date' => date('Y-m-d'),
  309. ];
  310. $res = $serv->task($taskData);
  311. RedisService::set('caches:task:lock:goods_sku_loaded', true, rand(3,5));
  312. echo "[Task UpdateGoodsSku {$date}] 更新商品SKU数据:{$res}\n";
  313. }else{
  314. echo "[Task UpdateGoodsSku {$date}] 间隔时间调用\n";
  315. }
  316. });
  317. // TODO 发放积分
  318. \swoole_timer_tick(300000, function ($timer) use ($serv, &$time) { // 启用定时器,每1200秒执行一次
  319. $date = date('Y-m-d H:i:s');
  320. if($time>3600 && file_exists($this->options['log_file'])){
  321. $time = 0;
  322. file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n");
  323. }
  324. $time++;
  325. if(!RedisService::get('caches:task:lock:grant_score_loaded')){
  326. $taskData = [
  327. 'taskName' => 'GrantScore',
  328. 'name' => "每日发放积分",
  329. 'date' => date('Y-m-d'),
  330. ];
  331. $res = $serv->task($taskData);
  332. RedisService::set('caches:task:lock:grant_score_loaded', true, rand(3,5));
  333. echo "[Task GrantScore {$date}] 每日发放积分:{$res}\n";
  334. }else{
  335. echo "[Task GrantScore {$date}] 间隔时间调用\n";
  336. }
  337. });
  338. // TODO 待返积分返还
  339. \swoole_timer_tick(300000, function ($timer) use ($serv, &$time) { // 启用定时器,每1200秒执行一次
  340. $date = date('Y-m-d H:i:s');
  341. if($time>3600 && file_exists($this->options['log_file'])){
  342. $time = 0;
  343. file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n");
  344. }
  345. $time++;
  346. if(!RedisService::get('caches:task:lock:waitt_score_loaded')){
  347. $taskData = [
  348. 'taskName' => 'ReturnWaitScore',
  349. 'name' => "待返积分每日返还",
  350. 'date' => date('Y-m-d'),
  351. ];
  352. $res = $serv->task($taskData);
  353. RedisService::set('caches:task:lock:waitt_score_loaded', true, rand(3,5));
  354. echo "[Task ReturnWaitScore {$date}] 待返积分每日返还:{$res}\n";
  355. }else{
  356. echo "[Task ReturnWaitScore {$date}] 间隔时间调用\n";
  357. }
  358. });
  359. }
  360. //监听连接进入事件
  361. public function onConnect($serv, $fd, $from_id)
  362. {
  363. $serv->send($fd, "Success {$fd}!");
  364. }
  365. // 监听数据接收事件
  366. public function onReceive(\Swoole\Server $serv, $fd, $from_id, $data)
  367. {
  368. echo "Get Message From Client {$fd}:{$data}\n";
  369. $res['result'] = 'success';
  370. $serv->send($fd, json_encode($res)); // 同步返回消息给客户端
  371. $serv->task($data); // 执行异步任务
  372. }
  373. /**
  374. * @param \Swoole\Server $serv
  375. * @param $task_id
  376. * @param $from_id
  377. * @param $data
  378. * @return false|string
  379. */
  380. public function onTask(\Swoole\Server $serv, $task_id, $from_id, $data)
  381. {
  382. $date = date('Y-m-d H:i:s');
  383. $taskName = isset($data['taskName']) ? $data['taskName'] : '';
  384. switch ($taskName) {
  385. case 'UpdateGoods': // 更新商品
  386. // 时间限制
  387. if(date('H:i') >= '00:00' && date('H:i') <= '03:00'){
  388. echo "[Task {$taskName} {$date}] 不在运行时间段内\n";
  389. return false;
  390. }
  391. // 调用处理
  392. if($res = GoodsService::make()->updateGoods()){
  393. $res = is_array($res) && $res? json_encode($res, 256) : 'success';
  394. echo "[Task {$taskName} {$date}] 商品数据获取更新结果:{$res}\n";
  395. }else{
  396. $error = GoodsService::make()->getError();
  397. $error = $error? lang($error) : 'failed';
  398. echo "[Task {$taskName} {$date}] 商品数据获取更新结果:{$error}\n";
  399. }
  400. break;
  401. case 'UpdateGoodsSku': // 更新商品SKu数据
  402. // 时间限制
  403. if(date('H:i') >= '00:00' && date('H:i') <= '03:00'){
  404. echo "[Task {$taskName} {$date}] 不在运行时间段内\n";
  405. return false;
  406. }
  407. // 调用处理
  408. if($res = GoodsService::make()->updateGoodsSku()){
  409. $res = is_array($res) && $res? json_encode($res, 256) : 'success';
  410. echo "[Task {$taskName} {$date}] 更新商品SKu数据结果:{$res}\n";
  411. }else{
  412. $error = GoodsService::make()->getError();
  413. $error = $error? lang($error) : 'failed';
  414. echo "[Task {$taskName} {$date}] 更新商品SKu数据结果:{$error}\n";
  415. }
  416. break;
  417. case 'UpdateGoodsCategory': // 更新商品分类数据
  418. // 时间限制
  419. if(date('H:i') >= '00:00' && date('H:i') <= '04:00'){
  420. echo "[Task {$taskName} {$date}] 不在运行时间段内\n";
  421. return false;
  422. }
  423. // 调用处理
  424. if($res = GoodsService::make()->updateGoodsCategory()){
  425. $res = is_array($res) && $res? json_encode($res, 256) : 'success';
  426. echo "[Task {$taskName} {$date}] 更新商品分类数据结果:{$res}\n";
  427. }else{
  428. $error = GoodsService::make()->getError();
  429. $error = $error? lang($error) : 'failed';
  430. echo "[Task {$taskName} {$date}] 更新商品分类数据结果:{$error}\n";
  431. }
  432. break;
  433. case 'UpdateGoodsCategorySub': // 更新商品分类数据
  434. // 时间限制
  435. if(date('H:i') >= '00:00' && date('H:i') <= '04:00'){
  436. echo "[Task {$taskName} {$date}] 不在运行时间段内\n";
  437. return false;
  438. }
  439. // 调用处理
  440. $pid = isset($data['pid'])? $data['pid'] : 0;
  441. if($res = GoodsService::make()->updateGoodsCategory($pid)){
  442. $res = is_array($res) && $res? json_encode($res, 256) : 'success';
  443. echo "[Task {$taskName} {$date}] 更新商品分类数据结果:{$res}\n";
  444. }else{
  445. $error = GoodsService::make()->getError();
  446. $error = $error? lang($error) : 'failed';
  447. echo "[Task {$taskName} {$date}] 更新商品分类数据结果:{$error}\n";
  448. }
  449. break;
  450. case 'UpdateUsdtPrice': // 更新USDT价格
  451. // 时间限制
  452. if(date('H:i') >= '01:00' && date('H:i') <= '04:00'){
  453. echo "[Task {$taskName} {$date}] 不在运行时间段内\n";
  454. return false;
  455. }
  456. // 调用处理
  457. if($res = WalletService::make()->getBianRatePrice(true)){
  458. $res = is_array($res) && $res? json_encode($res, 256) : 'success';
  459. echo "[Task {$taskName} {$date}] 更新USDT价格结果:{$res}\n";
  460. }else{
  461. $error = WalletService::make()->getError();
  462. $error = $error? lang($error) : 'failed';
  463. echo "[Task {$taskName} {$date}] 更新USDT价格结果:{$error}\n";
  464. }
  465. break;
  466. case 'GrantScore':
  467. // 时间限制
  468. if(date('H:i') >= '09:00'){
  469. echo "[Task {$taskName} {$date}] 不在运行时间段内\n";
  470. return false;
  471. }
  472. // 调用处理
  473. if($res = FinanceService::make()->grantScore()){
  474. $res = is_array($res) && $res? json_encode($res, 256) : 'success';
  475. echo "[Task {$taskName} {$date}] 每日发放积分结果:{$res}\n";
  476. }else{
  477. $error = FinanceService::make()->getError();
  478. $error = $error? lang($error) : 'failed';
  479. echo "[Task {$taskName} {$date}] 每日发放积分结果:{$error}\n";
  480. }
  481. break;
  482. case 'ReturnWaitScore':
  483. // 时间限制
  484. if(date('H:i') >= '10:00'){
  485. echo "[Task {$taskName} {$date}] 不在运行时间段内\n";
  486. return false;
  487. }
  488. // 调用处理
  489. if($res = FinanceService::make()->returnWaitScore()){
  490. $res = is_array($res) && $res? json_encode($res, 256) : 'success';
  491. echo "[Task {$taskName} {$date}] 待返积分每日返还结果:{$res}\n";
  492. }else{
  493. $error = FinanceService::make()->getError();
  494. $error = $error? lang($error) : 'failed';
  495. echo "[Task {$taskName} {$date}] 待返积分每日返还结果:{$error}\n";
  496. }
  497. break;
  498. }
  499. return '暂无任务处理';
  500. }
  501. /**
  502. * @param $serv swoole_server swoole_server对象
  503. * @param $task_id int 任务id
  504. * @param $data string 任务返回的数据
  505. */
  506. public function onFinish(\Swoole\Server $serv, $task_id, $data)
  507. {
  508. //
  509. echo "任务处理完成...\n";
  510. }
  511. // 监听连接关闭事件
  512. public function onClose($serv, $fd, $from_id)
  513. {
  514. echo "Client {$fd} close connection\n";
  515. $serv->close();
  516. }
  517. }