SwooleTask.php 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511
  1. <?php
  2. namespace App\Console\Commands;
  3. use App\Services\Api\MemberService;
  4. use App\Services\Api\PledgeOrderService;
  5. use App\Services\Api\PriceLogService;
  6. use App\Services\RedisService;
  7. use Illuminate\Console\Command;
  8. use Illuminate\Support\Facades\DB;
  9. class SwooleTask extends Command
  10. {
  11. protected $serv;
  12. protected $host = '127.0.0.1';
  13. protected $port = 6630;
  14. // 进程名称
  15. protected $taskName = 'swooleTask';
  16. // PID路径
  17. protected $pidPath = '/storage/swoole.pid';
  18. // task
  19. protected $onlyReloadTaskWorker = false;
  20. // 设置运行时参数
  21. protected $options = [
  22. 'worker_num' => 8, //worker进程数,一般设置为CPU数的1-4倍
  23. 'daemonize' => true, //启用守护进程
  24. 'log_file' => '/storage/logs/swoole-task.log', //指定swoole错误日志文件
  25. 'log_level' => 0, //日志级别 范围是0-5,0-DEBUG,1-TRACE,2-INFO,3-NOTICE,4-WARNING,5-ERROR
  26. 'dispatch_mode' => 1, //数据包分发策略,1-轮询模式
  27. 'task_worker_num' => 6, //task进程的数量
  28. 'task_ipc_mode' => 3, //使用消息队列通信,并设置为争抢模式
  29. ];
  30. /**
  31. * The name and signature of the console command.
  32. *
  33. * @var string
  34. */
  35. protected $signature = 'swoole:task {op}';
  36. /**
  37. * The console command description.
  38. *
  39. * @var string
  40. */
  41. protected $description = 'Swoole task server description';
  42. /**
  43. * Create a new command instance.
  44. *
  45. * @return void
  46. */
  47. public function __construct()
  48. {
  49. parent::__construct();
  50. }
  51. /**
  52. * 入口
  53. * Execute the console command.
  54. *
  55. * @return mixed
  56. */
  57. public function handle()
  58. {
  59. ini_set("default_socket_timeout", -1);
  60. // 项目根目录
  61. defined('ROOT_PATH') or define('ROOT_PATH', base_path());
  62. // 文件上传目录
  63. defined('ATTACHMENT_PATH') or define('ATTACHMENT_PATH', base_path('public/uploads'));
  64. // 图片上传目录
  65. defined('IMG_PATH') or define('IMG_PATH', base_path('public/uploads/images'));
  66. // 临时存放目录
  67. defined('UPLOAD_TEMP_PATH') or define('UPLOAD_TEMP_PATH', ATTACHMENT_PATH . "/temp");
  68. // 定义普通图片域名
  69. defined('IMG_URL') or define('IMG_URL', env('IMG_URL'));
  70. // 数据表前缀
  71. defined('DB_PREFIX') or define('DB_PREFIX', DB::connection()->getTablePrefix());
  72. $this->options['log_file'] = base_path() . $this->options['log_file'];
  73. $this->pidPath = base_path() . $this->pidPath;
  74. $op = $this->argument('op');
  75. switch ($op) {
  76. case 'status': // 状态
  77. $res = $this->status();
  78. echo $res ? $res : 0;
  79. break;
  80. case 'start': // 运行
  81. return $this->start();
  82. break;
  83. case 'reload': // 平滑重启
  84. return $this->reload();
  85. break;
  86. case 'stop': // 停止运行
  87. return $this->stop();
  88. break;
  89. default:
  90. exit("{$op} command does not exist");
  91. break;
  92. }
  93. }
  94. /**
  95. * 启动
  96. */
  97. public function start()
  98. {
  99. date_default_timezone_set('PRC');
  100. // 构建Server对象,监听对应地址
  101. $this->serv = new \Swoole\Server($this->host, $this->port);
  102. $this->serv->set($this->options);
  103. // 注册事件
  104. $this->serv->on('start', [$this, 'onStart']);
  105. $this->serv->on('receive', [$this, 'onReceive']);
  106. $this->serv->on('task', [$this, 'onTask']);
  107. $this->serv->on('finish', [$this, 'onFinish']);
  108. // Run worker
  109. echo "swoole start...\n";
  110. $this->serv->start();
  111. }
  112. // 安全重启
  113. public function reload()
  114. {
  115. $pids = file_exists($this->pidPath) ? file_get_contents($this->pidPath) : '';
  116. $pids = $pids ? explode("\n", $pids) : [];
  117. $masterPid = isset($pids[0]) ? $pids[0] : '';
  118. $managePid = isset($pids[1]) ? $pids[1] : '';
  119. if (empty($masterPid)) {
  120. return false;
  121. }
  122. if (!$this->status($masterPid)) {
  123. return false;
  124. }
  125. \Swoole\Process::kill($managePid, SIGUSR1);
  126. echo "swoole reload...\n";
  127. }
  128. /**
  129. * 停止
  130. * @param bool $smooth
  131. * @return bool
  132. */
  133. public function stop($smooth = false)
  134. {
  135. $pids = file_exists($this->pidPath) ? file_get_contents($this->pidPath) : '';
  136. $pids = $pids ? explode("\n", $pids) : [];
  137. $masterPid = isset($pids[0]) ? $pids[0] : '';
  138. $managePid = isset($pids[1]) ? $pids[1] : '';
  139. if (empty($masterPid)) {
  140. return false;
  141. }
  142. if (!$this->status($masterPid)) {
  143. return false;
  144. }
  145. // 直接杀
  146. $stoSh = base_path() . '/crontab/swooleTaskStop.sh';
  147. if (file_exists($stoSh) && function_exists('exec')) {
  148. exec("{$stoSh}");
  149. }
  150. @unlink($this->pidPath);
  151. echo "swoole stop...\n";
  152. }
  153. /**
  154. * 状态
  155. * @return mixed
  156. */
  157. public function status($masterPid = 0)
  158. {
  159. $res = false;
  160. if (empty($masterPid) && file_exists($this->pidPath)) {
  161. $pids = file_get_contents($this->pidPath);
  162. $pids = $pids ? explode("\n", $pids) : [];
  163. $masterPid = isset($pids[0]) ? $pids[0] : '';
  164. }
  165. if ($masterPid) {
  166. $res = \Swoole\Process::kill($masterPid, 0);
  167. }
  168. return $res;
  169. }
  170. public function onStart($serv)
  171. {
  172. if (!is_dir(dirname($this->pidPath))) {
  173. @mkdir(dirname($this->pidPath), true, 755);
  174. }
  175. //记录进程id,脚本实现自动重启
  176. $pid = "{$serv->master_pid}\n{$serv->manager_pid}";
  177. file_put_contents($this->pidPath, $pid);
  178. // 定时任务
  179. $time = 0;
  180. $date = date('Y-m-d H:i:s');
  181. if (file_exists($this->options['log_file'])) {
  182. $time = 0;
  183. file_put_contents($this->options['log_file'], "Task {$date}:清空日志\n");
  184. }
  185. // TODO 更新SBT每日价格
  186. \swoole_timer_tick(180000, function ($timer) use ($serv, &$time) { // 启用定时器,每3分钟执行一次
  187. $date = date('Y-m-d H:i:s');
  188. if ($time > 3600 && file_exists($this->options['log_file'])) {
  189. $time = 0;
  190. file_put_contents($this->options['log_file'], "Task {$date}:清空日志\n");
  191. }
  192. $time++;
  193. if (!RedisService::get('caches:task:lock:sbt_loaded')) {
  194. $taskData = [
  195. 'taskName' => 'UpdateSbtPrice',
  196. 'name' => "更新SBT每日价格",
  197. 'date' => date('Y-m-d'),
  198. ];
  199. $res = $serv->task($taskData);
  200. RedisService::set('caches:task:lock:sbt_loaded', true, rand(3, 5));
  201. echo "[Task UpdateSbtPrice {$date}] 更新SBT每日价格:{$res}\n";
  202. } else {
  203. echo "[Task UpdateSbtPrice {$date}] 间隔时间调用\n";
  204. }
  205. });
  206. // TODO 自动质押监控
  207. \swoole_timer_tick(180000, function ($timer) use ($serv, &$time) { // 启用定时器,每3分钟执行一次
  208. $date = date('Y-m-d H:i:s');
  209. if ($time > 7200 && file_exists($this->options['log_file'])) {
  210. $time = 0;
  211. file_put_contents($this->options['log_file'], "Task {$date}:清空日志\n");
  212. }
  213. $time++;
  214. $userList = MemberService::make()->pledgeUserList();
  215. if ($userList) {
  216. if (!RedisService::get('caches:task:lock:pledge_loaded')) {
  217. foreach ($userList as $item) {
  218. $userId = $item['id'];
  219. if ($userId) {
  220. $taskData = [
  221. 'taskName' => 'PledgeAutoTrade',
  222. 'name' => "自动质押交易",
  223. 'params' => $item,
  224. 'date' => date('Y-m-d'),
  225. ];
  226. $res = $serv->task($taskData);
  227. echo "[Task PledgeAutoTrade {$date}] 用户[{$userId}]自动质押交易结果:{$res}\n";
  228. }
  229. }
  230. RedisService::set('caches:task:lock:pledge_loaded', true, rand(5, 10));
  231. } else {
  232. echo "[Task PledgeAutoTrade {$date}] 间隔时间调用\n";
  233. }
  234. } else {
  235. echo "[Task PledgeAutoTrade {$date}] 暂无可自动质押交易用户\n";
  236. }
  237. });
  238. // TODO 检测质押订单退本,到期退本后(USDT余额足够)自动质押
  239. \swoole_timer_tick(180000, function ($timer) use ($serv, &$time) { // 启用定时器,每3分钟执行一次
  240. $date = date('Y-m-d H:i:s');
  241. if ($time > 3600 && file_exists($this->options['log_file'])) {
  242. $time = 0;
  243. file_put_contents($this->options['log_file'], "Task {$date}:清空日志\n");
  244. }
  245. $time++;
  246. $userList = PledgeOrderService::make()->getRefundOrderList();
  247. if ($userList) {
  248. if (!RedisService::get('caches:task:lock:plende_refund_loaded')) {
  249. foreach ($userList as $item) {
  250. $orderId = $item['id'];
  251. $userId = $item['user_id'];
  252. $orderNo = $item['order_no'];
  253. if ($orderId) {
  254. $taskData = [
  255. 'taskName' => 'PledgeRefund',
  256. 'name' => "质押订单自动退本",
  257. 'params'=> $item,
  258. 'date' => date('Y-m-d'),
  259. ];
  260. $res = $serv->task($taskData);
  261. echo "[Task PledgeRefund {$date}] 用户[{$userId}]质押订单[{$orderNo}]到期退本:{$res}\n";
  262. }
  263. }
  264. RedisService::set('caches:task:lock:pledge_settle_loaded', true, rand(3, 5));
  265. }else{
  266. echo "[Task PledgeRefund {$date}] 质押订单到期退本调用间隔\n";
  267. }
  268. } else {
  269. echo "[Task PledgeRefund {$date}] 间隔时间调用\n";
  270. }
  271. });
  272. // TODO 质押订单到期结算收益,同时发放奖励(质押收益、推荐奖、管理奖、平级奖-基于管理奖)
  273. \swoole_timer_tick(120000, function ($timer) use ($serv, &$time) { // 启用定时器,每120秒执行一次
  274. $date = date('Y-m-d H:i:s');
  275. if ($time > 3600 && file_exists($this->options['log_file'])) {
  276. $time = 0;
  277. file_put_contents($this->options['log_file'], "Task {$date}:清空日志\n");
  278. }
  279. $time++;
  280. $userList = PledgeOrderService::make()->getSettleOrderList();
  281. if ($userList) {
  282. if (!RedisService::get('caches:task:lock:pledge_settle_loaded')) {
  283. foreach ($userList as $item) {
  284. $orderId = $item['id'];
  285. $userId = $item['user_id'];
  286. $orderNo = $item['order_no'];
  287. if ($orderId) {
  288. $taskData = [
  289. 'taskName' => 'PledgeSettle',
  290. 'name' => "质押订单到期结算",
  291. 'params'=> $item,
  292. 'date' => date('Y-m-d'),
  293. ];
  294. $res = $serv->task($taskData);
  295. echo "[Task PledgeSettle {$date}] 用户[{$userId}]质押订单[{$orderNo}]到期结算:{$res}\n";
  296. }
  297. }
  298. RedisService::set('caches:task:lock:pledge_settle_loaded', true, rand(3, 5));
  299. }else{
  300. echo "[Task PledgeSettle {$date}] 质押订单到期结算调用间隔\n";
  301. }
  302. } else {
  303. echo "[Task PledgeSettle {$date}] 间隔时间调用\n";
  304. }
  305. });
  306. // TODO 更新用户等级
  307. \swoole_timer_tick(60000, function ($timer) use ($serv, &$time) { // 启用定时器,每5分钟执行一次
  308. $date = date('Y-m-d H:i:s');
  309. if ($time > 7200 && file_exists($this->options['log_file'])) {
  310. $time = 0;
  311. file_put_contents($this->options['log_file'], "Task {$date}:清空日志\n");
  312. }
  313. $time++;
  314. $userList = MemberService::make()->getUpgradeUserList();
  315. if ($userList) {
  316. if (!RedisService::get('caches:task:lock:upgrade_loaded')) {
  317. foreach ($userList as $item) {
  318. $userId = $item['id'];
  319. if ($userId) {
  320. $taskData = [
  321. 'taskName' => 'UpgradeUpdate',
  322. 'name' => "用户等级更新",
  323. 'params' => $item,
  324. 'date' => date('Y-m-d'),
  325. ];
  326. $taskId = $serv->task($taskData);
  327. echo "[Task UpgradeUpdate-{$taskId} {$date}] 用户[{$userId}]用户等级更新处理\n";
  328. }
  329. }
  330. RedisService::set('caches:task:lock:upgrade_loaded', true, rand(5, 10));
  331. } else {
  332. echo "[Task UpgradeUpdate {$date}] 间隔时间调用\n";
  333. }
  334. } else {
  335. echo "[Task UpgradeUpdate {$date}] 暂无需要更新等级用户\n";
  336. }
  337. });
  338. }
  339. //监听连接进入事件
  340. public function onConnect($serv, $fd, $from_id)
  341. {
  342. $serv->send($fd, "Success {$fd}!");
  343. }
  344. // 监听数据接收事件
  345. public function onReceive(\Swoole\Server $serv, $fd, $from_id, $data)
  346. {
  347. echo "Get Message From Client {$fd}:{$data}\n";
  348. $res['result'] = 'success';
  349. $serv->send($fd, json_encode($res)); // 同步返回消息给客户端
  350. $serv->task($data); // 执行异步任务
  351. }
  352. /**
  353. * @param \Swoole\Server $serv
  354. * @param $task_id
  355. * @param $from_id
  356. * @param $data
  357. * @return false|string
  358. */
  359. public function onTask(\Swoole\Server $serv, $task_id, $from_id, $data)
  360. {
  361. $date = date('Y-m-d H:i:s');
  362. $taskName = isset($data['taskName']) ? $data['taskName'] : '';
  363. $params = isset($data['params']) ? $data['params'] : [];
  364. try {
  365. switch ($taskName) {
  366. case 'UpdateSbtPrice': // 更新SBT每日价格
  367. // 时间限制
  368. if (date('H:i') >= '06:00') {
  369. echo "[Task {$taskName}-{$task_id} {$date}] 不在运行时间段内\n";
  370. return false;
  371. }
  372. // 调用处理
  373. if ($res = PriceLogService::make()->updateSbtPrice()) {
  374. $res = is_array($res) && $res ? json_encode($res, 256) : '处理成功';
  375. echo "[Task {$taskName}-{$task_id} {$date}] 更新SBT每日价格:{$res}\n";
  376. } else {
  377. $error = PriceLogService::make()->getError();
  378. $error = $error ? lang($error) : '处理失败';
  379. echo "[Task {$taskName}-{$task_id} {$date}] 更新SBT每日价格:{$error}\n";
  380. }
  381. break;
  382. case 'PledgeAutoTrade': // 自动质押处理
  383. // 调用处理
  384. $userId = isset($params['id'])? $params['id'] : 0;
  385. if ($res = PledgeOrderService::make()->autoMakeOrder($params)) {
  386. $res = is_array($res) && $res ? json_encode($res, 256) : '处理成功';
  387. echo "[Task {$taskName}-{$task_id} {$date}] 用户[{$userId}]自动质押交易处理结果:{$res}\n";
  388. } else {
  389. $error = PledgeOrderService::make()->getError();
  390. $error = $error ? lang($error) : '处理失败';
  391. echo "[Task {$taskName}-{$task_id} {$date}] 用户[{$userId}]自动质押交易处理结果:{$error}\n";
  392. }
  393. break;
  394. case 'PledgeRefund': // 质押订单退本,退本后自动再质押处理
  395. // 调用处理
  396. $userId = isset($params['user_id'])? $params['user_id'] : 0;
  397. $orderId = isset($params['id'])? $params['id'] : 0;
  398. $orderNo = isset($params['order_no'])? $params['order_no'] : '';
  399. if ($res = PledgeOrderService::make()->refund($orderId,$orderNo, $userId)) {
  400. $res = is_array($res) && $res ? json_encode($res, 256) : '处理成功';
  401. echo "[Task {$taskName}-{$task_id} {$date}] 用户[{$userId}]质押订单[{$orderNo}]退本处理成功:{$res}\n";
  402. } else {
  403. $error = PledgeOrderService::make()->getError();
  404. $error = $error ? lang($error) : '处理失败';
  405. echo "[Task {$taskName}-{$task_id} {$date}] 用户[{$userId}]质押订单[{$orderNo}]退本处理失败:{$error}\n";
  406. }
  407. break;
  408. case 'PledgeSettle': // 质押订单到期结算,和收益、奖励结算
  409. // 调用处理
  410. $userId = isset($params['user_id'])? $params['user_id'] : 0;
  411. $orderId = isset($params['id'])? $params['id'] : 0;
  412. $orderNo = isset($params['order_no'])? $params['order_no'] : '';
  413. if ($res = PledgeOrderService::make()->orderSettle($orderId,$orderNo,$userId)) {
  414. $res = is_array($res) && $res ? json_encode($res, 256) : '处理成功';
  415. echo "[Task {$taskName}-{$task_id} {$date}] 用户[{$userId}]质押订单[{$orderNo}]收益结算成功:{$res}\n";
  416. } else {
  417. $error = PledgeOrderService::make()->getError();
  418. $error = $error ? lang($error) : '处理失败';
  419. echo "[Task {$taskName}-{$task_id} {$date}] 用户[{$userId}]质押订单[{$orderNo}]收益结算失败:{$error}\n";
  420. }
  421. break;
  422. case 'UpgradeUpdate': // 用户等级更新
  423. // 调用处理
  424. $userId = isset($params['id'])? $params['id'] : 0;
  425. if ($res = MemberService::make()->upgradeUpdate($userId)) {
  426. $res = is_array($res) && $res ? json_encode($res, 256) : '处理成功';
  427. echo "[Task {$taskName}-{$task_id} {$date}] 用户[{$userId}]等级更新处理结果:{$res}\n";
  428. } else {
  429. $error = MemberService::make()->getError();
  430. $error = $error ? lang($error) : '处理失败';
  431. echo "[Task {$taskName}-{$task_id} {$date}] 用户[{$userId}]等级更新处理结果:{$error}\n";
  432. }
  433. break;
  434. }
  435. } catch (\Exception $exception) {
  436. return $exception->getMessage();
  437. }
  438. return '暂无任务处理';
  439. }
  440. /**
  441. * @param $serv swoole_server swoole_server对象
  442. * @param $task_id int 任务id
  443. * @param $data string 任务返回的数据
  444. */
  445. public function onFinish(\Swoole\Server $serv, $task_id, $data)
  446. {
  447. //
  448. echo "任务[{$task_id}]处理完成...\n";
  449. }
  450. // 监听连接关闭事件
  451. public function onClose($serv, $fd, $from_id)
  452. {
  453. echo "Client {$fd} close connection\n";
  454. $serv->close();
  455. }
  456. }