SwooleTask.php 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483
  1. <?php
  2. namespace App\Console\Commands;
  3. use App\Services\Api\FinanceService;
  4. use App\Services\Api\MemberService;
  5. use App\Services\Api\PledgeOrderService;
  6. use App\Services\Api\PriceLogService;
  7. use App\Services\Common\CurrencyService;
  8. use App\Services\Common\TickerService;
  9. use App\Services\RedisService;
  10. use App\Services\WalletService;
  11. use Illuminate\Console\Command;
  12. use Illuminate\Support\Facades\DB;
  13. class SwooleTask extends Command
  14. {
  15. protected $serv;
  16. protected $host = '127.0.0.1';
  17. protected $port = 6660;
  18. // 进程名称
  19. protected $taskName = 'swooleTask';
  20. // PID路径
  21. protected $pidPath = '/storage/swoole.pid';
  22. // task
  23. protected $onlyReloadTaskWorker = false;
  24. // 设置运行时参数
  25. protected $options = [
  26. 'worker_num' => 8, //worker进程数,一般设置为CPU数的1-4倍
  27. 'daemonize' => true, //启用守护进程
  28. 'log_file' => '/storage/logs/swoole-task.log', //指定swoole错误日志文件
  29. 'log_level' => 0, //日志级别 范围是0-5,0-DEBUG,1-TRACE,2-INFO,3-NOTICE,4-WARNING,5-ERROR
  30. 'dispatch_mode' => 1, //数据包分发策略,1-轮询模式
  31. 'task_worker_num' => 6, //task进程的数量
  32. 'task_ipc_mode' => 3, //使用消息队列通信,并设置为争抢模式
  33. ];
  34. /**
  35. * The name and signature of the console command.
  36. *
  37. * @var string
  38. */
  39. protected $signature = 'swoole:task {op}';
  40. /**
  41. * The console command description.
  42. *
  43. * @var string
  44. */
  45. protected $description = 'Swoole task server description';
  46. /**
  47. * Create a new command instance.
  48. *
  49. * @return void
  50. */
  51. public function __construct()
  52. {
  53. parent::__construct();
  54. }
  55. /**
  56. * 入口
  57. * Execute the console command.
  58. *
  59. * @return mixed
  60. */
  61. public function handle()
  62. {
  63. ini_set("default_socket_timeout", -1);
  64. // 项目根目录
  65. defined('ROOT_PATH') or define('ROOT_PATH', base_path());
  66. // 文件上传目录
  67. defined('ATTACHMENT_PATH') or define('ATTACHMENT_PATH', base_path('public/uploads'));
  68. // 图片上传目录
  69. defined('IMG_PATH') or define('IMG_PATH', base_path('public/uploads/images'));
  70. // 临时存放目录
  71. defined('UPLOAD_TEMP_PATH') or define('UPLOAD_TEMP_PATH', ATTACHMENT_PATH . "/temp");
  72. // 定义普通图片域名
  73. defined('IMG_URL') or define('IMG_URL', env('IMG_URL'));
  74. // 数据表前缀
  75. defined('DB_PREFIX') or define('DB_PREFIX', DB::connection()->getTablePrefix());
  76. $this->options['log_file'] = base_path() . $this->options['log_file'];
  77. $this->pidPath = base_path() . $this->pidPath;
  78. $op = $this->argument('op');
  79. switch ($op) {
  80. case 'status': // 状态
  81. $res = $this->status();
  82. echo $res ? $res : 0;
  83. break;
  84. case 'start': // 运行
  85. return $this->start();
  86. break;
  87. case 'reload': // 平滑重启
  88. return $this->reload();
  89. break;
  90. case 'stop': // 停止运行
  91. return $this->stop();
  92. break;
  93. default:
  94. exit("{$op} command does not exist");
  95. break;
  96. }
  97. }
  98. /**
  99. * 启动
  100. */
  101. public function start()
  102. {
  103. date_default_timezone_set('PRC');
  104. // 构建Server对象,监听对应地址
  105. $this->serv = new \Swoole\Server($this->host, $this->port);
  106. $this->serv->set($this->options);
  107. // 注册事件
  108. $this->serv->on('start', [$this, 'onStart']);
  109. $this->serv->on('receive', [$this, 'onReceive']);
  110. $this->serv->on('task', [$this, 'onTask']);
  111. $this->serv->on('finish', [$this, 'onFinish']);
  112. // Run worker
  113. echo "swoole start...\n";
  114. $this->serv->start();
  115. }
  116. // 安全重启
  117. public function reload()
  118. {
  119. $pids = file_exists($this->pidPath) ? file_get_contents($this->pidPath) : '';
  120. $pids = $pids ? explode("\n", $pids) : [];
  121. $masterPid = isset($pids[0]) ? $pids[0] : '';
  122. $managePid = isset($pids[1]) ? $pids[1] : '';
  123. if (empty($masterPid)) {
  124. return false;
  125. }
  126. if (!$this->status($masterPid)) {
  127. return false;
  128. }
  129. \Swoole\Process::kill($managePid, SIGUSR1);
  130. echo "swoole reload...\n";
  131. }
  132. /**
  133. * 停止
  134. * @param bool $smooth
  135. * @return bool
  136. */
  137. public function stop($smooth = false)
  138. {
  139. $pids = file_exists($this->pidPath) ? file_get_contents($this->pidPath) : '';
  140. $pids = $pids ? explode("\n", $pids) : [];
  141. $masterPid = isset($pids[0]) ? $pids[0] : '';
  142. if (empty($masterPid)) {
  143. return false;
  144. }
  145. if (!$this->status($masterPid)) {
  146. return false;
  147. }
  148. // 直接杀
  149. $stoSh = base_path() . '/crontab/swooleTaskStop.sh';
  150. if (file_exists($stoSh) && function_exists('exec')) {
  151. exec("{$stoSh}");
  152. }
  153. @unlink($this->pidPath);
  154. echo "swoole stop...\n";
  155. }
  156. /**
  157. * 状态
  158. * @return mixed
  159. */
  160. public function status($masterPid = 0)
  161. {
  162. $res = false;
  163. if (empty($masterPid) && file_exists($this->pidPath)) {
  164. $pids = file_get_contents($this->pidPath);
  165. $pids = $pids ? explode("\n", $pids) : [];
  166. $masterPid = isset($pids[0]) ? $pids[0] : '';
  167. }
  168. if ($masterPid) {
  169. $res = \Swoole\Process::kill($masterPid, 0);
  170. }
  171. return $res;
  172. }
  173. public function onStart($serv)
  174. {
  175. if (!is_dir(dirname($this->pidPath))) {
  176. @mkdir(dirname($this->pidPath), true, 755);
  177. }
  178. //记录进程id,脚本实现自动重启
  179. $pid = "{$serv->master_pid}\n{$serv->manager_pid}";
  180. file_put_contents($this->pidPath, $pid);
  181. // 定时任务
  182. $time = 0;
  183. $date = date('Y-m-d H:i:s');
  184. if (file_exists($this->options['log_file'])) {
  185. $time = 0;
  186. file_put_contents($this->options['log_file'], "Task {$date}:清空日志\n");
  187. }
  188. // TODO 更新USDT价格
  189. \swoole_timer_tick(2000, function ($timer) use ($serv, &$time) { // 启用定时器,每2秒钟执行一次
  190. $date = date('Y-m-d H:i:s');
  191. if ($time > 3600 && file_exists($this->options['log_file'])) {
  192. $time = 0;
  193. file_put_contents($this->options['log_file'], "Task {$date}:清空日志\n");
  194. }
  195. $time++;
  196. if (!RedisService::get('caches:task:lock:usdt_loaded')) {
  197. $taskData = [
  198. 'taskName' => 'UpdateUsdtPrice',
  199. 'name' => "更新USDT价格",
  200. 'date' => date('Y-m-d'),
  201. ];
  202. $res = $serv->task($taskData);
  203. RedisService::set('caches:task:lock:usdt_loaded', true, rand(2, 3));
  204. echo "[Task UpdateUsdtPrice {$date}] 更新USDT价格:{$res}\n";
  205. } else {
  206. echo "[Task UpdateUsdtPrice {$date}] 间隔时间调用\n";
  207. }
  208. });
  209. // TODO 定期清理价格记录
  210. \swoole_timer_tick(20000, function ($timer) use ($serv, &$time) { // 启用定时器,每2分钟执行一次
  211. // \swoole_timer_tick(120000, function ($timer) use ($serv, &$time) { // 启用定时器,每2分钟执行一次
  212. $date = date('Y-m-d H:i:s');
  213. if ($time > 7200 && file_exists($this->options['log_file'])) {
  214. $time = 0;
  215. file_put_contents($this->options['log_file'], "Task {$date}:清空日志\n");
  216. }
  217. $time++;
  218. $ccyList = CurrencyService::make()->getSyncList();
  219. if ($ccyList) {
  220. foreach ($ccyList as $item) {
  221. $ccyId = $item['id'];
  222. $ccy = $item['name'];
  223. $cacheKey = "caches:task:lock:currency_clear_loaded_{$ccy}";
  224. if ($ccyId && !RedisService::get($cacheKey)) {
  225. $taskData = [
  226. 'taskName' => 'ClearCurrencyPrice',
  227. 'name' => "定期清理价格记录",
  228. 'params' => $item,
  229. 'date' => date('Y-m-d'),
  230. ];
  231. $res = $serv->task($taskData);
  232. RedisService::set($cacheKey, true, rand(2, 3));
  233. echo "[Task ClearCurrencyPrice {$date}] 清理[{$ccy}-USDT]价格记录:{$res}\n";
  234. }else {
  235. echo "[Task ClearCurrencyPrice {$date}] 间隔时间调用\n";
  236. }
  237. }
  238. } else {
  239. echo "[Task ClearCurrencyPrice {$date}] 暂无需清理价格记录币种\n";
  240. }
  241. });
  242. return false;
  243. // TODO 自动监控方案下单
  244. \swoole_timer_tick(3000, function ($timer) use ($serv, &$time) { // 启用定时器,每3秒执行一次
  245. $date = date('Y-m-d H:i:s');
  246. if ($time > 7200 && file_exists($this->options['log_file'])) {
  247. $time = 0;
  248. file_put_contents($this->options['log_file'], "Task {$date}:清空日志\n");
  249. }
  250. $time++;
  251. $userList = MemberService::make()->pledgeUserList();
  252. if ($userList) {
  253. if (!RedisService::get('caches:task:lock:pledge_loaded')) {
  254. foreach ($userList as $item) {
  255. $userId = $item['id'];
  256. if ($userId) {
  257. $taskData = [
  258. 'taskName' => 'PledgeAutoTrade',
  259. 'name' => "自动质押交易",
  260. 'params' => $item,
  261. 'date' => date('Y-m-d'),
  262. ];
  263. $res = $serv->task($taskData);
  264. echo "[Task PledgeAutoTrade {$date}] 用户[{$userId}]自动质押交易结果:{$res}\n";
  265. }
  266. }
  267. RedisService::set('caches:task:lock:pledge_loaded', true, rand(5, 10));
  268. } else {
  269. echo "[Task PledgeAutoTrade {$date}] 间隔时间调用\n";
  270. }
  271. } else {
  272. echo "[Task PledgeAutoTrade-0 {$date}] 暂无可自动质押交易用户\n";
  273. }
  274. });
  275. // TODO 检测质押订单退本,到期退本后(USDT余额足够)自动质押
  276. // \swoole_timer_tick(180000, function ($timer) use ($serv, &$time) { // 启用定时器,每3分钟执行一次
  277. // $date = date('Y-m-d H:i:s');
  278. // if ($time > 3600 && file_exists($this->options['log_file'])) {
  279. // $time = 0;
  280. // file_put_contents($this->options['log_file'], "Task {$date}:清空日志\n");
  281. // }
  282. // $time++;
  283. // $userList = PledgeOrderService::make()->getRefundOrderList();
  284. // if ($userList) {
  285. // if (!RedisService::get('caches:task:lock:plende_refund_loaded')) {
  286. // foreach ($userList as $item) {
  287. // $orderId = $item['id'];
  288. // $userId = $item['user_id'];
  289. // $orderNo = $item['order_no'];
  290. // if ($orderId) {
  291. // $taskData = [
  292. // 'taskName' => 'PledgeRefund',
  293. // 'name' => "质押订单自动退本",
  294. // 'params'=> $item,
  295. // 'date' => date('Y-m-d'),
  296. // ];
  297. // $res = $serv->task($taskData);
  298. // echo "[Task PledgeRefund {$date}] 用户[{$userId}]质押订单[{$orderNo}]到期退本:{$res}\n";
  299. // }
  300. // }
  301. //
  302. // RedisService::set('caches:task:lock:pledge_settle_loaded', true, rand(3, 5));
  303. // }else{
  304. // echo "[Task PledgeRefund {$date}] 质押订单到期退本调用间隔\n";
  305. // }
  306. // } else {
  307. // echo "[Task PledgeRefund {$date}] 间隔时间调用\n";
  308. // }
  309. // });
  310. //
  311. // // TODO 质押订单到期结算收益,同时发放奖励(质押收益、推荐奖、管理奖、平级奖-基于管理奖)
  312. // \swoole_timer_tick(120000, function ($timer) use ($serv, &$time) { // 启用定时器,每120秒执行一次
  313. // $date = date('Y-m-d H:i:s');
  314. // if ($time > 3600 && file_exists($this->options['log_file'])) {
  315. // $time = 0;
  316. // file_put_contents($this->options['log_file'], "Task {$date}:清空日志\n");
  317. // }
  318. // $time++;
  319. // $userList = PledgeOrderService::make()->getSettleOrderList();
  320. // if ($userList) {
  321. // if (!RedisService::get('caches:task:lock:pledge_settle_loaded')) {
  322. // foreach ($userList as $item) {
  323. // $orderId = $item['id'];
  324. // $userId = $item['user_id'];
  325. // $orderNo = $item['order_no'];
  326. // if ($orderId) {
  327. // $taskData = [
  328. // 'taskName' => 'PledgeSettle',
  329. // 'name' => "质押订单到期结算",
  330. // 'params'=> $item,
  331. // 'date' => date('Y-m-d'),
  332. // ];
  333. // $res = $serv->task($taskData);
  334. // echo "[Task PledgeSettle {$date}] 用户[{$userId}]质押订单[{$orderNo}]到期结算:{$res}\n";
  335. // }
  336. // }
  337. //
  338. // RedisService::set('caches:task:lock:pledge_settle_loaded', true, rand(3, 5));
  339. // }else{
  340. // echo "[Task PledgeSettle {$date}] 质押订单到期结算调用间隔\n";
  341. // }
  342. // } else {
  343. // echo "[Task PledgeSettle {$date}] 间隔时间调用\n";
  344. // }
  345. //
  346. // });
  347. }
  348. //监听连接进入事件
  349. public function onConnect($serv, $fd, $from_id)
  350. {
  351. $serv->send($fd, "Success {$fd}!");
  352. }
  353. // 监听数据接收事件
  354. public function onReceive(\Swoole\Server $serv, $fd, $from_id, $data)
  355. {
  356. echo "Get Message From Client {$fd}:{$data}\n";
  357. $res['result'] = 'success';
  358. $serv->send($fd, json_encode($res)); // 同步返回消息给客户端
  359. $serv->task($data); // 执行异步任务
  360. }
  361. /**
  362. * @param \Swoole\Server $serv
  363. * @param $task_id
  364. * @param $from_id
  365. * @param $data
  366. * @return false|string
  367. */
  368. public function onTask(\Swoole\Server $serv, $task_id, $from_id, $data)
  369. {
  370. $date = date('Y-m-d H:i:s');
  371. $taskName = isset($data['taskName']) ? $data['taskName'] : '';
  372. $params = isset($data['params']) ? $data['params'] : [];
  373. try {
  374. switch ($taskName) {
  375. case 'UpdateUsdtPrice': // 更新USDT价格
  376. // 调用处理
  377. $res = WalletService::make()->getUsdtPrice('sell');
  378. if ($res && $res1 = WalletService::make()->getUsdtPrice('buy')) {
  379. echo "[Task {$taskName}-{$task_id} {$date}] 更新USDT价格:sell-{$res} buy-{$res1}\n";
  380. } else {
  381. $error = WalletService::make()->getError();
  382. $error = $error ? lang($error) : '处理失败';
  383. echo "[Task {$taskName}-{$task_id} {$date}] 更新USDT价格:{$error}\n";
  384. }
  385. break;
  386. case 'ClearCurrencyPrice': // 定期清理价格记录
  387. // 调用处理
  388. $ccy = isset($params['name'])? $params['name'] : 0;
  389. if(empty($ccy)){
  390. echo "[Task {$taskName}-{$task_id} {$date}] 币种参数错误\n";
  391. return false;
  392. }
  393. if(date('H:i') >= '04:00'){
  394. echo "[Task {$taskName}-{$task_id} {$date}] 不在运行时间内\n";
  395. return false;
  396. }
  397. if ($res = TickerService::make()->clearPriceLog($ccy)) {
  398. $res = is_array($res) && $res ? json_encode($res, 256) : '价格记录清理成功';
  399. echo "[Task {$taskName}-{$task_id} {$date}] 币种[{$ccy}-USDT]定期清理价格记录结果:{$res}\n";
  400. } else {
  401. $error = TickerService::make()->getError();
  402. $error = $error ? lang($error) : '价格记录清理失败';
  403. echo "[Task {$taskName}-{$task_id} {$date}] 币种[{$ccy}-USDT]定期清理价格记录结果:{$error}\n";
  404. }
  405. break;
  406. }
  407. } catch (\Exception $exception) {
  408. return $exception->getMessage();
  409. }
  410. return '暂无任务处理';
  411. }
  412. /**
  413. * @param $serv swoole_server swoole_server对象
  414. * @param $task_id int 任务id
  415. * @param $data string 任务返回的数据
  416. */
  417. public function onFinish(\Swoole\Server $serv, $task_id, $data)
  418. {
  419. //
  420. echo "任务[{$task_id}]处理完成...\n";
  421. }
  422. // 监听连接关闭事件
  423. public function onClose($serv, $fd, $from_id)
  424. {
  425. echo "Client {$fd} close connection\n";
  426. $serv->close();
  427. }
  428. }