SwooleTask.php 23 KB


  1. <?php
  2. namespace App\Console\Commands;
  3. use App\Services\Api\FinanceService;
  4. use App\Services\Api\MemberService;
  5. use App\Services\Api\PledgeOrderService;
  6. use App\Services\Api\PriceLogService;
  7. use App\Services\RedisService;
  8. use Illuminate\Console\Command;
  9. use Illuminate\Support\Facades\DB;
  10. class SwooleTask extends Command
  11. {
  12. protected $serv;
  13. protected $host = '127.0.0.1';
  14. protected $port = 6630;
  15. // 进程名称
  16. protected $taskName = 'swooleTask';
  17. // PID路径
  18. protected $pidPath = '/storage/swoole.pid';
  19. // task
  20. protected $onlyReloadTaskWorker = false;
  21. // 设置运行时参数
  22. protected $options = [
  23. 'worker_num' => 8, //worker进程数,一般设置为CPU数的1-4倍
  24. 'daemonize' => true, //启用守护进程
  25. 'log_file' => '/storage/logs/swoole-task.log', //指定swoole错误日志文件
  26. 'log_level' => 0, //日志级别 范围是0-5,0-DEBUG,1-TRACE,2-INFO,3-NOTICE,4-WARNING,5-ERROR
  27. 'dispatch_mode' => 1, //数据包分发策略,1-轮询模式
  28. 'task_worker_num' => 6, //task进程的数量
  29. 'task_ipc_mode' => 3, //使用消息队列通信,并设置为争抢模式
  30. ];
  31. /**
  32. * The name and signature of the console command.
  33. *
  34. * @var string
  35. */
  36. protected $signature = 'swoole:task {op}';
  37. /**
  38. * The console command description.
  39. *
  40. * @var string
  41. */
  42. protected $description = 'Swoole task server description';
  43. /**
  44. * Create a new command instance.
  45. *
  46. * @return void
  47. */
  48. public function __construct()
  49. {
  50. parent::__construct();
  51. }
  52. /**
  53. * 入口
  54. * Execute the console command.
  55. *
  56. * @return mixed
  57. */
  58. public function handle()
  59. {
  60. ini_set("default_socket_timeout", -1);
  61. // 项目根目录
  62. defined('ROOT_PATH') or define('ROOT_PATH', base_path());
  63. // 文件上传目录
  64. defined('ATTACHMENT_PATH') or define('ATTACHMENT_PATH', base_path('public/uploads'));
  65. // 图片上传目录
  66. defined('IMG_PATH') or define('IMG_PATH', base_path('public/uploads/images'));
  67. // 临时存放目录
  68. defined('UPLOAD_TEMP_PATH') or define('UPLOAD_TEMP_PATH', ATTACHMENT_PATH . "/temp");
  69. // 定义普通图片域名
  70. defined('IMG_URL') or define('IMG_URL', env('IMG_URL'));
  71. // 数据表前缀
  72. defined('DB_PREFIX') or define('DB_PREFIX', DB::connection()->getTablePrefix());
  73. $this->options['log_file'] = base_path() . $this->options['log_file'];
  74. $this->pidPath = base_path() . $this->pidPath;
  75. $op = $this->argument('op');
  76. switch ($op) {
  77. case 'status': // 状态
  78. $res = $this->status();
  79. echo $res ? $res : 0;
  80. break;
  81. case 'start': // 运行
  82. return $this->start();
  83. break;
  84. case 'reload': // 平滑重启
  85. return $this->reload();
  86. break;
  87. case 'stop': // 停止运行
  88. return $this->stop();
  89. break;
  90. default:
  91. exit("{$op} command does not exist");
  92. break;
  93. }
  94. }
  95. /**
  96. * 启动
  97. */
  98. public function start()
  99. {
  100. date_default_timezone_set('PRC');
  101. // 构建Server对象,监听对应地址
  102. $this->serv = new \Swoole\Server($this->host, $this->port);
  103. $this->serv->set($this->options);
  104. // 注册事件
  105. $this->serv->on('start', [$this, 'onStart']);
  106. $this->serv->on('receive', [$this, 'onReceive']);
  107. $this->serv->on('task', [$this, 'onTask']);
  108. $this->serv->on('finish', [$this, 'onFinish']);
  109. // Run worker
  110. echo "swoole start...\n";
  111. $this->serv->start();
  112. }
  113. // 安全重启
  114. public function reload()
  115. {
  116. $pids = file_exists($this->pidPath) ? file_get_contents($this->pidPath) : '';
  117. $pids = $pids ? explode("\n", $pids) : [];
  118. $masterPid = isset($pids[0]) ? $pids[0] : '';
  119. $managePid = isset($pids[1]) ? $pids[1] : '';
  120. if (empty($masterPid)) {
  121. return false;
  122. }
  123. if (!$this->status($masterPid)) {
  124. return false;
  125. }
  126. \Swoole\Process::kill($managePid, SIGUSR1);
  127. echo "swoole reload...\n";
  128. }
  129. /**
  130. * 停止
  131. * @param bool $smooth
  132. * @return bool
  133. */
  134. public function stop($smooth = false)
  135. {
  136. $pids = file_exists($this->pidPath) ? file_get_contents($this->pidPath) : '';
  137. $pids = $pids ? explode("\n", $pids) : [];
  138. $masterPid = isset($pids[0]) ? $pids[0] : '';
  139. $managePid = isset($pids[1]) ? $pids[1] : '';
  140. if (empty($masterPid)) {
  141. return false;
  142. }
  143. if (!$this->status($masterPid)) {
  144. return false;
  145. }
  146. // 直接杀
  147. $stoSh = base_path() . '/crontab/swooleTaskStop.sh';
  148. if (file_exists($stoSh) && function_exists('exec')) {
  149. exec("{$stoSh}");
  150. }
  151. @unlink($this->pidPath);
  152. echo "swoole stop...\n";
  153. }
  154. /**
  155. * 状态
  156. * @return mixed
  157. */
  158. public function status($masterPid = 0)
  159. {
  160. $res = false;
  161. if (empty($masterPid) && file_exists($this->pidPath)) {
  162. $pids = file_get_contents($this->pidPath);
  163. $pids = $pids ? explode("\n", $pids) : [];
  164. $masterPid = isset($pids[0]) ? $pids[0] : '';
  165. }
  166. if ($masterPid) {
  167. $res = \Swoole\Process::kill($masterPid, 0);
  168. }
  169. return $res;
  170. }
  171. public function onStart($serv)
  172. {
  173. if (!is_dir(dirname($this->pidPath))) {
  174. @mkdir(dirname($this->pidPath), true, 755);
  175. }
  176. //记录进程id,脚本实现自动重启
  177. $pid = "{$serv->master_pid}\n{$serv->manager_pid}";
  178. file_put_contents($this->pidPath, $pid);
  179. // 定时任务
  180. $time = 0;
  181. $date = date('Y-m-d H:i:s');
  182. if (file_exists($this->options['log_file'])) {
  183. $time = 0;
  184. file_put_contents($this->options['log_file'], "Task {$date}:清空日志\n");
  185. }
  186. // TODO 更新SBT每日价格
  187. \swoole_timer_tick(180000, function ($timer) use ($serv, &$time) { // 启用定时器,每3分钟执行一次
  188. $date = date('Y-m-d H:i:s');
  189. if ($time > 3600 && file_exists($this->options['log_file'])) {
  190. $time = 0;
  191. file_put_contents($this->options['log_file'], "Task {$date}:清空日志\n");
  192. }
  193. $time++;
  194. if (!RedisService::get('caches:task:lock:sbt_loaded')) {
  195. $taskData = [
  196. 'taskName' => 'UpdateSbtPrice',
  197. 'name' => "更新SBT每日价格",
  198. 'date' => date('Y-m-d'),
  199. ];
  200. $res = $serv->task($taskData);
  201. RedisService::set('caches:task:lock:sbt_loaded', true, rand(3, 5));
  202. echo "[Task UpdateSbtPrice {$date}] 更新SBT每日价格:{$res}\n";
  203. } else {
  204. echo "[Task UpdateSbtPrice {$date}] 间隔时间调用\n";
  205. }
  206. });
  207. // TODO 自动质押监控
  208. \swoole_timer_tick(180000, function ($timer) use ($serv, &$time) { // 启用定时器,每3分钟执行一次
  209. $date = date('Y-m-d H:i:s');
  210. if ($time > 7200 && file_exists($this->options['log_file'])) {
  211. $time = 0;
  212. file_put_contents($this->options['log_file'], "Task {$date}:清空日志\n");
  213. }
  214. $time++;
  215. $userList = MemberService::make()->pledgeUserList();
  216. if ($userList) {
  217. if (!RedisService::get('caches:task:lock:pledge_loaded')) {
  218. foreach ($userList as $item) {
  219. $userId = $item['id'];
  220. if ($userId) {
  221. $taskData = [
  222. 'taskName' => 'PledgeAutoTrade',
  223. 'name' => "自动质押交易",
  224. 'params' => $item,
  225. 'date' => date('Y-m-d'),
  226. ];
  227. $res = $serv->task($taskData);
  228. echo "[Task PledgeAutoTrade {$date}] 用户[{$userId}]自动质押交易结果:{$res}\n";
  229. }
  230. }
  231. RedisService::set('caches:task:lock:pledge_loaded', true, rand(5, 10));
  232. } else {
  233. echo "[Task PledgeAutoTrade {$date}] 间隔时间调用\n";
  234. }
  235. } else {
  236. echo "[Task PledgeAutoTrade {$date}] 暂无可自动质押交易用户\n";
  237. }
  238. });
  239. // TODO 检测质押订单退本,到期退本后(USDT余额足够)自动质押
  240. \swoole_timer_tick(180000, function ($timer) use ($serv, &$time) { // 启用定时器,每3分钟执行一次
  241. $date = date('Y-m-d H:i:s');
  242. if ($time > 3600 && file_exists($this->options['log_file'])) {
  243. $time = 0;
  244. file_put_contents($this->options['log_file'], "Task {$date}:清空日志\n");
  245. }
  246. $time++;
  247. $userList = PledgeOrderService::make()->getRefundOrderList();
  248. if ($userList) {
  249. if (!RedisService::get('caches:task:lock:plende_refund_loaded')) {
  250. foreach ($userList as $item) {
  251. $orderId = $item['id'];
  252. $userId = $item['user_id'];
  253. $orderNo = $item['order_no'];
  254. if ($orderId) {
  255. $taskData = [
  256. 'taskName' => 'PledgeRefund',
  257. 'name' => "质押订单自动退本",
  258. 'params'=> $item,
  259. 'date' => date('Y-m-d'),
  260. ];
  261. $res = $serv->task($taskData);
  262. echo "[Task PledgeRefund {$date}] 用户[{$userId}]质押订单[{$orderNo}]到期退本:{$res}\n";
  263. }
  264. }
  265. RedisService::set('caches:task:lock:pledge_settle_loaded', true, rand(3, 5));
  266. }else{
  267. echo "[Task PledgeRefund {$date}] 质押订单到期退本调用间隔\n";
  268. }
  269. } else {
  270. echo "[Task PledgeRefund {$date}] 间隔时间调用\n";
  271. }
  272. });
  273. // TODO 质押订单到期结算收益,同时发放奖励(质押收益、推荐奖、管理奖、平级奖-基于管理奖)
  274. \swoole_timer_tick(120000, function ($timer) use ($serv, &$time) { // 启用定时器,每120秒执行一次
  275. $date = date('Y-m-d H:i:s');
  276. if ($time > 3600 && file_exists($this->options['log_file'])) {
  277. $time = 0;
  278. file_put_contents($this->options['log_file'], "Task {$date}:清空日志\n");
  279. }
  280. $time++;
  281. $userList = PledgeOrderService::make()->getSettleOrderList();
  282. if ($userList) {
  283. if (!RedisService::get('caches:task:lock:pledge_settle_loaded')) {
  284. foreach ($userList as $item) {
  285. $orderId = $item['id'];
  286. $userId = $item['user_id'];
  287. $orderNo = $item['order_no'];
  288. if ($orderId) {
  289. $taskData = [
  290. 'taskName' => 'PledgeSettle',
  291. 'name' => "质押订单到期结算",
  292. 'params'=> $item,
  293. 'date' => date('Y-m-d'),
  294. ];
  295. $res = $serv->task($taskData);
  296. echo "[Task PledgeSettle {$date}] 用户[{$userId}]质押订单[{$orderNo}]到期结算:{$res}\n";
  297. }
  298. }
  299. RedisService::set('caches:task:lock:pledge_settle_loaded', true, rand(3, 5));
  300. }else{
  301. echo "[Task PledgeSettle {$date}] 质押订单到期结算调用间隔\n";
  302. }
  303. } else {
  304. echo "[Task PledgeSettle {$date}] 间隔时间调用\n";
  305. }
  306. });
  307. // TODO 更新用户等级
  308. \swoole_timer_tick(60000, function ($timer) use ($serv, &$time) { // 启用定时器,每5分钟执行一次
  309. $date = date('Y-m-d H:i:s');
  310. if ($time > 7200 && file_exists($this->options['log_file'])) {
  311. $time = 0;
  312. file_put_contents($this->options['log_file'], "Task {$date}:清空日志\n");
  313. }
  314. $time++;
  315. $userList = MemberService::make()->getUpgradeUserList();
  316. if ($userList) {
  317. if (!RedisService::get('caches:task:lock:upgrade_loaded')) {
  318. foreach ($userList as $item) {
  319. $userId = $item['id'];
  320. if ($userId) {
  321. $taskData = [
  322. 'taskName' => 'UpgradeUpdate',
  323. 'name' => "用户等级更新",
  324. 'params' => $item,
  325. 'date' => date('Y-m-d'),
  326. ];
  327. $taskId = $serv->task($taskData);
  328. echo "[Task UpgradeUpdate-{$taskId} {$date}] 用户[{$userId}]用户等级更新处理\n";
  329. }
  330. }
  331. RedisService::set('caches:task:lock:upgrade_loaded', true, rand(5, 10));
  332. } else {
  333. echo "[Task UpgradeUpdate {$date}] 间隔时间调用\n";
  334. }
  335. } else {
  336. echo "[Task UpgradeUpdate {$date}] 暂无需要更新等级用户\n";
  337. }
  338. });
  339. // 开发者维护收益结算
  340. \swoole_timer_tick(180000, function ($timer) use ($serv, &$time) { // 启用定时器,每3分钟执行一次
  341. $date = date('Y-m-d H:i:s');
  342. if ($time > 3600 && file_exists($this->options['log_file'])) {
  343. $time = 0;
  344. file_put_contents($this->options['log_file'], "Task {$date}:清空日志\n");
  345. }
  346. $time++;
  347. $userList = MemberService::make()->getDeveloperList();
  348. if ($userList) {
  349. if (!RedisService::get('caches:task:lock:developer_loaded')) {
  350. foreach ($userList as $item) {
  351. $id = $item['id'];
  352. $walletUrl = $item['wallet_url'];
  353. if ($walletUrl) {
  354. $taskData = [
  355. 'taskName' => 'DeveloperSettle',
  356. 'name' => "开发者维护收益结算",
  357. 'params' => $item,
  358. 'date' => date('Y-m-d'),
  359. ];
  360. $taskId = $serv->task($taskData);
  361. echo "[Task DeveloperSettle-{$taskId} {$date}] 开发者[{$id}]用户维护收益结算处理\n";
  362. }
  363. }
  364. RedisService::set('caches:task:lock:developer_loaded', true, rand(5, 10));
  365. } else {
  366. echo "[Task DeveloperSettle {$date}] 间隔时间调用\n";
  367. }
  368. } else {
  369. echo "[Task DeveloperSettle {$date}] 暂无需要结算收益开发者\n";
  370. }
  371. });
  372. }
  373. //监听连接进入事件
  374. public function onConnect($serv, $fd, $from_id)
  375. {
  376. $serv->send($fd, "Success {$fd}!");
  377. }
  378. // 监听数据接收事件
  379. public function onReceive(\Swoole\Server $serv, $fd, $from_id, $data)
  380. {
  381. echo "Get Message From Client {$fd}:{$data}\n";
  382. $res['result'] = 'success';
  383. $serv->send($fd, json_encode($res)); // 同步返回消息给客户端
  384. $serv->task($data); // 执行异步任务
  385. }
  386. /**
  387. * @param \Swoole\Server $serv
  388. * @param $task_id
  389. * @param $from_id
  390. * @param $data
  391. * @return false|string
  392. */
  393. public function onTask(\Swoole\Server $serv, $task_id, $from_id, $data)
  394. {
  395. $date = date('Y-m-d H:i:s');
  396. $taskName = isset($data['taskName']) ? $data['taskName'] : '';
  397. $params = isset($data['params']) ? $data['params'] : [];
  398. try {
  399. switch ($taskName) {
  400. case 'UpdateSbtPrice': // 更新SBT每日价格
  401. // 时间限制
  402. if (date('H:i') >= '06:00') {
  403. echo "[Task {$taskName}-{$task_id} {$date}] 不在运行时间段内\n";
  404. return false;
  405. }
  406. // 调用处理
  407. if ($res = PriceLogService::make()->updateSbtPrice()) {
  408. $res = is_array($res) && $res ? json_encode($res, 256) : '处理成功';
  409. echo "[Task {$taskName}-{$task_id} {$date}] 更新SBT每日价格:{$res}\n";
  410. } else {
  411. $error = PriceLogService::make()->getError();
  412. $error = $error ? lang($error) : '处理失败';
  413. echo "[Task {$taskName}-{$task_id} {$date}] 更新SBT每日价格:{$error}\n";
  414. }
  415. break;
  416. case 'PledgeAutoTrade': // 自动质押处理
  417. // 调用处理
  418. $userId = isset($params['id'])? $params['id'] : 0;
  419. if ($res = PledgeOrderService::make()->autoMakeOrder($params)) {
  420. $res = is_array($res) && $res ? json_encode($res, 256) : '处理成功';
  421. echo "[Task {$taskName}-{$task_id} {$date}] 用户[{$userId}]自动质押交易处理结果:{$res}\n";
  422. } else {
  423. $error = PledgeOrderService::make()->getError();
  424. $error = $error ? lang($error) : '处理失败';
  425. echo "[Task {$taskName}-{$task_id} {$date}] 用户[{$userId}]自动质押交易处理结果:{$error}\n";
  426. }
  427. break;
  428. case 'PledgeRefund': // 质押订单退本,退本后自动再质押处理
  429. // 调用处理
  430. $userId = isset($params['user_id'])? $params['user_id'] : 0;
  431. $orderId = isset($params['id'])? $params['id'] : 0;
  432. $orderNo = isset($params['order_no'])? $params['order_no'] : '';
  433. if ($res = PledgeOrderService::make()->refund($orderId,$orderNo, $userId)) {
  434. $res = is_array($res) && $res ? json_encode($res, 256) : '处理成功';
  435. echo "[Task {$taskName}-{$task_id} {$date}] 用户[{$userId}]质押订单[{$orderNo}]退本处理成功:{$res}\n";
  436. } else {
  437. $error = PledgeOrderService::make()->getError();
  438. $error = $error ? lang($error) : '处理失败';
  439. echo "[Task {$taskName}-{$task_id} {$date}] 用户[{$userId}]质押订单[{$orderNo}]退本处理失败:{$error}\n";
  440. }
  441. break;
  442. case 'PledgeSettle': // 质押订单到期结算,和收益、奖励结算
  443. // 调用处理
  444. $userId = isset($params['user_id'])? $params['user_id'] : 0;
  445. $orderId = isset($params['id'])? $params['id'] : 0;
  446. $orderNo = isset($params['order_no'])? $params['order_no'] : '';
  447. if ($res = PledgeOrderService::make()->orderSettle($orderId,$orderNo,$userId)) {
  448. $res = is_array($res) && $res ? json_encode($res, 256) : '处理成功';
  449. echo "[Task {$taskName}-{$task_id} {$date}] 用户[{$userId}]质押订单[{$orderNo}]收益结算成功:{$res}\n";
  450. } else {
  451. $error = PledgeOrderService::make()->getError();
  452. $error = $error ? lang($error) : '处理失败';
  453. echo "[Task {$taskName}-{$task_id} {$date}] 用户[{$userId}]质押订单[{$orderNo}]收益结算失败:{$error}\n";
  454. }
  455. break;
  456. case 'UpgradeUpdate': // 用户等级更新
  457. // 调用处理
  458. $userId = isset($params['id'])? $params['id'] : 0;
  459. if ($res = MemberService::make()->upgradeUpdate($userId)) {
  460. $res = is_array($res) && $res ? json_encode($res, 256) : '处理成功';
  461. echo "[Task {$taskName}-{$task_id} {$date}] 用户[{$userId}]等级更新处理结果:{$res}\n";
  462. } else {
  463. $error = MemberService::make()->getError();
  464. $error = $error ? lang($error) : '处理失败';
  465. echo "[Task {$taskName}-{$task_id} {$date}] 用户[{$userId}]等级更新处理结果:{$error}\n";
  466. }
  467. break;
  468. case 'DeveloperSettle': // 维护收益结算
  469. // 调用处理
  470. $id = isset($params['id'])? $params['id'] : 0;
  471. $walletUrl = isset($params['wallet_url'])? $params['wallet_url'] : '';
  472. if ($res = FinanceService::make()->dveloperSettle($id, $walletUrl)) {
  473. $res = is_array($res) && $res ? json_encode($res, 256) : '处理成功';
  474. echo "[Task {$taskName}-{$task_id} {$date}] 开发者[{$id}]维护收益结算处理结果:{$res}\n";
  475. } else {
  476. $error = MemberService::make()->getError();
  477. $error = $error ? lang($error) : '处理失败';
  478. echo "[Task {$taskName}-{$task_id} {$date}] 开发者[{$id}]维护收益结算处理结果:{$error}\n";
  479. }
  480. break;
  481. }
  482. } catch (\Exception $exception) {
  483. return $exception->getMessage();
  484. }
  485. return '暂无任务处理';
  486. }
  487. /**
  488. * @param $serv swoole_server swoole_server对象
  489. * @param $task_id int 任务id
  490. * @param $data string 任务返回的数据
  491. */
  492. public function onFinish(\Swoole\Server $serv, $task_id, $data)
  493. {
  494. //
  495. echo "任务[{$task_id}]处理完成...\n";
  496. }
  497. // 监听连接关闭事件
  498. public function onClose($serv, $fd, $from_id)
  499. {
  500. echo "Client {$fd} close connection\n";
  501. $serv->close();
  502. }
  503. }