SwooleTask.php 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534
  1. <?php
  2. namespace App\Console\Commands;
  3. use App\Services\Api\FinanceService;
  4. use App\Services\Api\OrderService;
  5. use App\Services\RedisService;
  6. use App\Services\WalletService;
  7. use Illuminate\Console\Command;
  8. use Illuminate\Support\Facades\DB;
  9. class SwooleTask extends Command
  10. {
  11. protected $serv;
  12. protected $host = '127.0.0.1';
  13. protected $port = 6622;
  14. // 进程名称
  15. protected $taskName = 'swooleTask';
  16. // PID路径
  17. protected $pidPath = '/storage/swoole.pid';
  18. // task
  19. protected $onlyReloadTaskWorker = false;
  20. // 设置运行时参数
  21. protected $options = [
  22. 'worker_num' => 8, //worker进程数,一般设置为CPU数的1-4倍
  23. 'daemonize' => true, //启用守护进程
  24. 'log_file' => '/storage/logs/swoole-task.log', //指定swoole错误日志文件
  25. 'log_level' => 0, //日志级别 范围是0-5,0-DEBUG,1-TRACE,2-INFO,3-NOTICE,4-WARNING,5-ERROR
  26. 'dispatch_mode' => 1, //数据包分发策略,1-轮询模式
  27. 'task_worker_num' => 6, //task进程的数量
  28. 'task_ipc_mode' => 3, //使用消息队列通信,并设置为争抢模式
  29. ];
  30. /**
  31. * The name and signature of the console command.
  32. *
  33. * @var string
  34. */
  35. protected $signature = 'swoole:task {op}';
  36. /**
  37. * The console command description.
  38. *
  39. * @var string
  40. */
  41. protected $description = 'Swoole task server description';
  42. /**
  43. * Create a new command instance.
  44. *
  45. * @return void
  46. */
  47. public function __construct()
  48. {
  49. parent::__construct();
  50. }
  51. /**
  52. * 入口
  53. * Execute the console command.
  54. *
  55. * @return mixed
  56. */
  57. public function handle()
  58. {
  59. ini_set("default_socket_timeout", -1);
  60. // 项目根目录
  61. defined('ROOT_PATH') or define('ROOT_PATH', base_path());
  62. // 文件上传目录
  63. defined('ATTACHMENT_PATH') or define('ATTACHMENT_PATH', base_path('public/uploads'));
  64. // 图片上传目录
  65. defined('IMG_PATH') or define('IMG_PATH', base_path('public/uploads/images'));
  66. // 临时存放目录
  67. defined('UPLOAD_TEMP_PATH') or define('UPLOAD_TEMP_PATH', ATTACHMENT_PATH . "/temp");
  68. // 定义普通图片域名
  69. defined('IMG_URL') or define('IMG_URL', env('IMG_URL'));
  70. // 数据表前缀
  71. defined('DB_PREFIX') or define('DB_PREFIX', DB::connection()->getTablePrefix());
  72. $this->options['log_file'] = base_path() . $this->options['log_file'];
  73. $this->pidPath = base_path() . $this->pidPath;
  74. $op = $this->argument('op');
  75. switch ($op) {
  76. case 'status': // 状态
  77. $res = $this->status();
  78. echo $res ? $res : 0;
  79. break;
  80. case 'start': // 运行
  81. return $this->start();
  82. break;
  83. case 'reload': // 平滑重启
  84. return $this->reload();
  85. break;
  86. case 'stop': // 停止运行
  87. return $this->stop();
  88. break;
  89. default:
  90. exit("{$op} command does not exist");
  91. break;
  92. }
  93. }
  94. /**
  95. * 启动
  96. */
  97. public function start()
  98. {
  99. date_default_timezone_set('PRC');
  100. // 构建Server对象,监听对应地址
  101. $this->serv = new \Swoole\Server($this->host, $this->port);
  102. $this->serv->set($this->options);
  103. // 注册事件
  104. $this->serv->on('start', [$this, 'onStart']);
  105. $this->serv->on('receive', [$this, 'onReceive']);
  106. $this->serv->on('task', [$this, 'onTask']);
  107. $this->serv->on('finish', [$this, 'onFinish']);
  108. // Run worker
  109. echo "swoole start...\n";
  110. $this->serv->start();
  111. }
  112. // 安全重启
  113. public function reload()
  114. {
  115. $pids = file_exists($this->pidPath) ? file_get_contents($this->pidPath) : '';
  116. $pids = $pids ? explode("\n", $pids) : [];
  117. $masterPid = isset($pids[0]) ? $pids[0] : '';
  118. $managePid = isset($pids[1]) ? $pids[1] : '';
  119. if (empty($masterPid)) {
  120. return false;
  121. }
  122. if (!$this->status($masterPid)) {
  123. return false;
  124. }
  125. \Swoole\Process::kill($managePid, SIGUSR1);
  126. echo "swoole reload...\n";
  127. }
  128. /**
  129. * 停止
  130. * @param bool $smooth
  131. * @return bool
  132. */
  133. public function stop($smooth = false)
  134. {
  135. $pids = file_exists($this->pidPath) ? file_get_contents($this->pidPath) : '';
  136. $pids = $pids ? explode("\n", $pids) : [];
  137. $masterPid = isset($pids[0]) ? $pids[0] : '';
  138. $managePid = isset($pids[1]) ? $pids[1] : '';
  139. if (empty($masterPid)) {
  140. return false;
  141. }
  142. if (!$this->status($masterPid)) {
  143. return false;
  144. }
  145. // if ($smooth) {
  146. // \Swoole\Process::kill($masterPid, SIGTERM);
  147. // try {
  148. // while (true) {
  149. // \Swoole\Process::kill($masterPid, 0);
  150. // }
  151. // } catch (\Exception $exception) {
  152. //
  153. // }
  154. // } else {
  155. // \Swoole\Process::kill($masterPid, SIGKILL);
  156. // }
  157. //
  158. // if($managePid){
  159. // \Swoole\Process::kill($managePid, SIGKILL);
  160. // }
  161. // 直接杀
  162. $stoSh = base_path().'/crontab/swooleTaskStop.sh';
  163. echo $stoSh;
  164. if(file_exists($stoSh) && function_exists('exec')){
  165. exec("{$stoSh}");
  166. }
  167. @unlink($this->pidPath);
  168. echo "swoole stop...\n";
  169. }
  170. /**
  171. * 状态
  172. * @return mixed
  173. */
  174. public function status($masterPid = 0)
  175. {
  176. $res = false;
  177. if (empty($masterPid) && file_exists($this->pidPath)) {
  178. $pids = file_get_contents($this->pidPath);
  179. $pids = $pids ? explode("\n", $pids) : [];
  180. $masterPid = isset($pids[0]) ? $pids[0] : '';
  181. }
  182. if ($masterPid) {
  183. $res = \Swoole\Process::kill($masterPid, 0);
  184. }
  185. return $res;
  186. }
  187. public function onStart($serv)
  188. {
  189. if (!is_dir(dirname($this->pidPath))) {
  190. @mkdir(dirname($this->pidPath), true, 755);
  191. }
  192. //记录进程id,脚本实现自动重启
  193. $pid = "{$serv->master_pid}\n{$serv->manager_pid}";
  194. file_put_contents($this->pidPath, $pid);
  195. // 定时任务
  196. $time = 0;
  197. $date = date('Y-m-d H:i:s');
  198. if(file_exists($this->options['log_file'])){
  199. $time = 0;
  200. file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n");
  201. }
  202. // TODO 更新USDT价格
  203. \swoole_timer_tick(30000, function ($timer) use ($serv, &$time) { // 启用定时器,每120秒执行一次
  204. $date = date('Y-m-d H:i:s');
  205. if($time>3600 && file_exists($this->options['log_file'])){
  206. $time = 0;
  207. file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n");
  208. }
  209. $time++;
  210. if(!RedisService::get('caches:task:lock:usdt_loaded')){
  211. $taskData = [
  212. 'taskName' => 'UpdateUsdtPrice',
  213. 'name' => "更新USDT价格",
  214. 'date' => date('Y-m-d'),
  215. ];
  216. $res = $serv->task($taskData);
  217. RedisService::set('caches:task:lock:usdt_loaded', true, rand(3,5));
  218. echo "[Task UpdateUsdtPrice {$date}] 更新USDT价格:{$res}\n";
  219. }else{
  220. echo "[Task UpdateUsdtPrice {$date}] 间隔时间调用\n";
  221. }
  222. });
  223. // TODO 发放积分
  224. \swoole_timer_tick(180000, function ($timer) use ($serv, &$time) { // 启用定时器,每180秒执行一次
  225. $date = date('Y-m-d H:i:s');
  226. if($time>3600 && file_exists($this->options['log_file'])){
  227. $time = 0;
  228. file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n");
  229. }
  230. $time++;
  231. if(!RedisService::get('caches:task:lock:grant_score_loaded')){
  232. $taskData = [
  233. 'taskName' => 'GrantScore',
  234. 'name' => "每日发放积分",
  235. 'date' => date('Y-m-d'),
  236. ];
  237. $res = $serv->task($taskData);
  238. RedisService::set('caches:task:lock:grant_score_loaded', true, rand(3,5));
  239. echo "[Task GrantScore {$date}] 每日发放积分:{$res}\n";
  240. }else{
  241. echo "[Task GrantScore {$date}] 间隔时间调用\n";
  242. }
  243. });
  244. // TODO 待返积分返还
  245. \swoole_timer_tick(120000, function ($timer) use ($serv, &$time) { // 启用定时器,每120秒执行一次
  246. $date = date('Y-m-d H:i:s');
  247. if($time>3600 && file_exists($this->options['log_file'])){
  248. $time = 0;
  249. file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n");
  250. }
  251. $time++;
  252. if(!RedisService::get('caches:task:lock:wait_score_loaded')){
  253. $taskData = [
  254. 'taskName' => 'ReturnWaitScore',
  255. 'name' => "待返积分每日返还",
  256. 'date' => date('Y-m-d'),
  257. ];
  258. $res = $serv->task($taskData);
  259. RedisService::set('caches:task:lock:wait_score_loaded', true, rand(3,5));
  260. echo "[Task ReturnWaitScore {$date}] 待返积分每日返还:{$res}\n";
  261. }else{
  262. echo "[Task ReturnWaitScore {$date}] 间隔时间调用\n";
  263. }
  264. });
  265. // TODO 全球分红结算
  266. \swoole_timer_tick(120000, function ($timer) use ($serv, &$time) { // 启用定时器,每120秒执行一次
  267. $date = date('Y-m-d H:i:s');
  268. if($time>3600 && file_exists($this->options['log_file'])){
  269. $time = 0;
  270. file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n");
  271. }
  272. $time++;
  273. if(!RedisService::get('caches:task:lock:global_loaded')){
  274. $taskData = [
  275. 'taskName' => 'GlobalBonus',
  276. 'name' => "待返积分每日返还",
  277. 'date' => date('Y-m-d'),
  278. ];
  279. $res = $serv->task($taskData);
  280. RedisService::set('caches:task:lock:global_loaded', true, rand(3,5));
  281. echo "[Task GlobalBonus {$date}] 全球分红结算:{$res}\n";
  282. }else{
  283. echo "[Task GlobalBonus {$date}] 间隔时间调用\n";
  284. }
  285. });
  286. // TODO 更新订单状态
  287. \swoole_timer_tick(300000, function ($timer) use ($serv, &$time) { // 启用定时器,每300秒执行一次
  288. $date = date('Y-m-d H:i:s');
  289. if($time>3600 && file_exists($this->options['log_file'])){
  290. $time = 0;
  291. file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n");
  292. }
  293. $time++;
  294. if(!RedisService::get('caches:task:lock:order_loaded')){
  295. $taskData = [
  296. 'taskName' => 'UpdateOrderStatus',
  297. 'name' => "更新订单状态",
  298. 'date' => date('Y-m-d'),
  299. ];
  300. $res = $serv->task($taskData);
  301. RedisService::set('caches:task:lock:order_loaded', true, rand(3,5));
  302. echo "[Task UpdateOrderStatus {$date}] 更新订单状态:{$res}\n";
  303. }else{
  304. echo "[Task UpdateOrderStatus {$date}] 间隔时间调用\n";
  305. }
  306. });
  307. // TODO 矿机每日释放
  308. \swoole_timer_tick(300000, function ($timer) use ($serv, &$time) { // 启用定时器,每300秒执行一次
  309. $date = date('Y-m-d H:i:s');
  310. if($time>3600 && file_exists($this->options['log_file'])){
  311. $time = 0;
  312. file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n");
  313. }
  314. $time++;
  315. if(!RedisService::get('caches:task:lock:wait_score_loaded')){
  316. $taskData = [
  317. 'taskName' => 'ReturnWaitScore',
  318. 'name' => "待返积分每日返还",
  319. 'date' => date('Y-m-d'),
  320. ];
  321. $res = $serv->task($taskData);
  322. RedisService::set('caches:task:lock:wait_score_loaded', true, rand(3,5));
  323. echo "[Task ReturnWaitScore {$date}] 待返积分每日返还:{$res}\n";
  324. }else{
  325. echo "[Task ReturnWaitScore {$date}] 间隔时间调用\n";
  326. }
  327. });
  328. }
  329. //监听连接进入事件
  330. public function onConnect($serv, $fd, $from_id)
  331. {
  332. $serv->send($fd, "Success {$fd}!");
  333. }
  334. // 监听数据接收事件
  335. public function onReceive(\Swoole\Server $serv, $fd, $from_id, $data)
  336. {
  337. echo "Get Message From Client {$fd}:{$data}\n";
  338. $res['result'] = 'success';
  339. $serv->send($fd, json_encode($res)); // 同步返回消息给客户端
  340. $serv->task($data); // 执行异步任务
  341. }
  342. /**
  343. * @param \Swoole\Server $serv
  344. * @param $task_id
  345. * @param $from_id
  346. * @param $data
  347. * @return false|string
  348. */
  349. public function onTask(\Swoole\Server $serv, $task_id, $from_id, $data)
  350. {
  351. $date = date('Y-m-d H:i:s');
  352. $taskName = isset($data['taskName']) ? $data['taskName'] : '';
  353. try {
  354. switch ($taskName) {
  355. case 'UpdateOrderStatus': // 更新订单状态
  356. // 时间限制
  357. if(date('H:i') >= '00:00' && date('H:i') <= '02:00'){
  358. echo "[Task {$taskName} {$date}] 不在运行时间段内\n";
  359. return false;
  360. }
  361. // 调用处理
  362. if($res = OrderService::make()->updateOrderStatus()){
  363. $res = is_array($res) && $res? json_encode($res, 256) : 'success';
  364. echo "[Task {$taskName} {$date}] 更新订单状态结果:{$res}\n";
  365. }else{
  366. $error = OrderService::make()->getError();
  367. $error = $error? lang($error) : 'failed';
  368. echo "[Task {$taskName} {$date}] 更新订单状态结果:{$error}\n";
  369. }
  370. break;
  371. case 'UpdateOrderRefundStatus': // 更新售后订单状态
  372. // 时间限制
  373. if(date('H:i') <= '04:00' || (date('H:i') >= '08:00' && date('H:i') <= '20:00')){
  374. echo "[Task {$taskName} {$date}] 不在运行时间段内\n";
  375. return false;
  376. }
  377. // 调用处理
  378. if($res = OrderService::make()->updateOrderRefundStatus()){
  379. $res = is_array($res) && $res? json_encode($res, 256) : 'success';
  380. echo "[Task {$taskName} {$date}] 更新售后订单状态结果:{$res}\n";
  381. }else{
  382. $error = OrderService::make()->getError();
  383. $error = $error? lang($error) : 'failed';
  384. echo "[Task {$taskName} {$date}] 更新售后订单状态结果:{$error}\n";
  385. }
  386. break;
  387. case 'UpdateUsdtPrice': // 更新USDT价格
  388. // 时间限制
  389. if(date('H:i') >= '01:00' && date('H:i') <= '02:00'){
  390. echo "[Task {$taskName} {$date}] 不在运行时间段内\n";
  391. return false;
  392. }
  393. // 调用处理
  394. if($res = WalletService::make()->getBianRatePrice(true)){
  395. $res = is_array($res) && $res? json_encode($res, 256) : 'success';
  396. echo "[Task {$taskName} {$date}] 更新USDT价格结果:{$res}\n";
  397. }else{
  398. $error = WalletService::make()->getError();
  399. $error = $error? lang($error) : 'failed';
  400. echo "[Task {$taskName} {$date}] 更新USDT价格结果:{$error}\n";
  401. }
  402. break;
  403. case 'GrantScore': // 空投积分
  404. // 时间限制
  405. if(date('H:i') >= '18:00'){
  406. echo "[Task {$taskName} {$date}] 不在运行时间段内\n";
  407. return false;
  408. }
  409. // 调用处理
  410. if($res = FinanceService::make()->grantScore()){
  411. $res = is_array($res) && $res? json_encode($res, 256) : 'success';
  412. echo "[Task {$taskName} {$date}] 每日发放积分结果:{$res}\n";
  413. }else{
  414. $error = FinanceService::make()->getError();
  415. $error = $error? lang($error) : 'failed';
  416. echo "[Task {$taskName} {$date}] 每日发放积分结果:{$error}\n";
  417. }
  418. break;
  419. case 'ReturnWaitScore':
  420. // 时间限制
  421. if(date('H:i') >= '06:00'){
  422. echo "[Task {$taskName} {$date}] 不在运行时间段内\n";
  423. return false;
  424. }
  425. // 调用处理
  426. if($res = FinanceService::make()->returnWaitScore()){
  427. $res = is_array($res) && $res? json_encode($res, 256) : 'success';
  428. echo "[Task {$taskName} {$date}] 待返积分每日返还结果:{$res}\n";
  429. }else{
  430. $error = FinanceService::make()->getError();
  431. $error = $error? lang($error) : 'failed';
  432. echo "[Task {$taskName} {$date}] 待返积分每日返还结果:{$error}\n";
  433. }
  434. break;
  435. case 'GlobalBonus': // 全球分红
  436. // 时间限制
  437. if(date('H:i') >= '05:00'){
  438. echo "[Task {$taskName} {$date}] 不在运行时间段内\n";
  439. return false;
  440. }
  441. // 调用处理
  442. if($res = FinanceService::make()->globalBonus()){
  443. $res = is_array($res) && $res? json_encode($res, 256) : 'success';
  444. echo "[Task {$taskName} {$date}] 全球分红结算结果:{$res}\n";
  445. }else{
  446. $error = FinanceService::make()->getError();
  447. $error = $error? lang($error) : 'failed';
  448. echo "[Task {$taskName} {$date}] 全球分红结算结果:{$error}\n";
  449. }
  450. break;
  451. }
  452. } catch(\Exception $exception){
  453. return $exception->getMessage();
  454. }
  455. return '暂无任务处理';
  456. }
  457. /**
  458. * @param $serv swoole_server swoole_server对象
  459. * @param $task_id int 任务id
  460. * @param $data string 任务返回的数据
  461. */
  462. public function onFinish(\Swoole\Server $serv, $task_id, $data)
  463. {
  464. //
  465. echo "任务处理完成...\n";
  466. }
  467. // 监听连接关闭事件
  468. public function onClose($serv, $fd, $from_id)
  469. {
  470. echo "Client {$fd} close connection\n";
  471. $serv->close();
  472. }
  473. }