PriceTask.php 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. <?php
  2. namespace App\Console\Commands;
  3. use App\Services\Api\MemberService;
  4. use App\Services\Common\CurrencyService;
  5. use App\Services\Common\TickerService;
  6. use App\Services\RedisService;
  7. use App\Services\WalletService;
  8. use Illuminate\Console\Command;
  9. use Illuminate\Support\Facades\DB;
  10. class PriceTask extends Command
  11. {
  12. protected $serv;
  13. protected $host = '127.0.0.1';
  14. protected $port = 6662;
  15. // 进程名称
  16. protected $taskName = 'priceTask';
  17. // PID路径
  18. protected $pidPath = '/storage/priceTask.pid';
  19. // task
  20. protected $onlyReloadTaskWorker = false;
  21. // 设置运行时参数
  22. protected $options = [
  23. 'worker_num' => 8, //worker进程数,一般设置为CPU数的1-4倍
  24. 'daemonize' => true, //启用守护进程
  25. 'log_file' => '/storage/logs/price-task.log', //指定swoole错误日志文件
  26. 'log_level' => 0, //日志级别 范围是0-5,0-DEBUG,1-TRACE,2-INFO,3-NOTICE,4-WARNING,5-ERROR
  27. 'dispatch_mode' => 1, //数据包分发策略,1-轮询模式
  28. 'task_worker_num' => 6, //task进程的数量
  29. 'task_ipc_mode' => 3, //使用消息队列通信,并设置为争抢模式
  30. ];
  31. /**
  32. * The name and signature of the console command.
  33. *
  34. * @var string
  35. */
  36. protected $signature = 'price:task {op}';
  37. /**
  38. * The console command description.
  39. *
  40. * @var string
  41. */
  42. protected $description = 'Price task server description';
  43. /**
  44. * Create a new command instance.
  45. *
  46. * @return void
  47. */
  48. public function __construct()
  49. {
  50. parent::__construct();
  51. }
  52. /**
  53. * 入口
  54. * Execute the console command.
  55. *
  56. * @return mixed
  57. */
  58. public function handle()
  59. {
  60. ini_set("default_socket_timeout", -1);
  61. // 项目根目录
  62. defined('ROOT_PATH') or define('ROOT_PATH', base_path());
  63. // 文件上传目录
  64. defined('ATTACHMENT_PATH') or define('ATTACHMENT_PATH', base_path('public/uploads'));
  65. // 图片上传目录
  66. defined('IMG_PATH') or define('IMG_PATH', base_path('public/uploads/images'));
  67. // 临时存放目录
  68. defined('UPLOAD_TEMP_PATH') or define('UPLOAD_TEMP_PATH', ATTACHMENT_PATH . "/temp");
  69. // 定义普通图片域名
  70. defined('IMG_URL') or define('IMG_URL', env('IMG_URL'));
  71. // 数据表前缀
  72. defined('DB_PREFIX') or define('DB_PREFIX', DB::connection()->getTablePrefix());
  73. $this->options['log_file'] = base_path() . $this->options['log_file'];
  74. $this->pidPath = base_path() . $this->pidPath;
  75. $op = $this->argument('op');
  76. switch ($op) {
  77. case 'status': // 状态
  78. $res = $this->status();
  79. echo $res ? $res : 0;
  80. break;
  81. case 'start': // 运行
  82. return $this->start();
  83. break;
  84. case 'reload': // 平滑重启
  85. return $this->reload();
  86. break;
  87. case 'stop': // 停止运行
  88. return $this->stop();
  89. break;
  90. default:
  91. exit("{$op} command does not exist");
  92. break;
  93. }
  94. }
  95. /**
  96. * 启动
  97. */
  98. public function start()
  99. {
  100. date_default_timezone_set('PRC');
  101. // 构建Server对象,监听对应地址
  102. $this->serv = new \Swoole\Server($this->host, $this->port);
  103. $this->serv->set($this->options);
  104. // 注册事件
  105. $this->serv->on('start', [$this, 'onStart']);
  106. $this->serv->on('receive', [$this, 'onReceive']);
  107. $this->serv->on('task', [$this, 'onTask']);
  108. $this->serv->on('finish', [$this, 'onFinish']);
  109. // Run worker
  110. echo "priceTask start...\n";
  111. $this->serv->start();
  112. }
  113. // 安全重启
  114. public function reload()
  115. {
  116. $pids = file_exists($this->pidPath) ? file_get_contents($this->pidPath) : '';
  117. $pids = $pids ? explode("\n", $pids) : [];
  118. $managePid = isset($pids[1]) ? $pids[1] : '';
  119. if (empty($masterPid)) {
  120. return false;
  121. }
  122. if (!$this->status($masterPid)) {
  123. return false;
  124. }
  125. \Swoole\Process::kill($managePid, SIGUSR1);
  126. echo "priceTask reload...\n";
  127. }
  128. /**
  129. * 停止
  130. * @param bool $smooth
  131. * @return bool
  132. */
  133. public function stop($smooth = false)
  134. {
  135. $pids = file_exists($this->pidPath) ? file_get_contents($this->pidPath) : '';
  136. $pids = $pids ? explode("\n", $pids) : [];
  137. $masterPid = isset($pids[0]) ? $pids[0] : '';
  138. if (empty($masterPid)) {
  139. return false;
  140. }
  141. if (!$this->status($masterPid)) {
  142. return false;
  143. }
  144. // 直接杀
  145. $stoSh = base_path() . '/crontab/priceTaskStop.sh';
  146. if (file_exists($stoSh) && function_exists('exec')) {
  147. exec("{$stoSh}");
  148. }
  149. @unlink($this->pidPath);
  150. echo "priceTask stop...\n";
  151. }
  152. /**
  153. * 状态
  154. * @return mixed
  155. */
  156. public function status($masterPid = 0)
  157. {
  158. $res = false;
  159. if (empty($masterPid) && file_exists($this->pidPath)) {
  160. $pids = file_get_contents($this->pidPath);
  161. $pids = $pids ? explode("\n", $pids) : [];
  162. $masterPid = isset($pids[0]) ? $pids[0] : '';
  163. }
  164. if ($masterPid) {
  165. $res = \Swoole\Process::kill($masterPid, 0);
  166. }
  167. return $res;
  168. }
  169. public function onStart($serv)
  170. {
  171. if (!is_dir(dirname($this->pidPath))) {
  172. @mkdir(dirname($this->pidPath), true, 755);
  173. }
  174. //记录进程id,脚本实现自动重启
  175. $pid = "{$serv->master_pid}\n{$serv->manager_pid}";
  176. file_put_contents($this->pidPath, $pid);
  177. // 定时任务
  178. $time = 0;
  179. $date = date('Y-m-d H:i:s');
  180. if (file_exists($this->options['log_file'])) {
  181. $time = 0;
  182. file_put_contents($this->options['log_file'], "Task {$date}:清空日志\n");
  183. }
  184. // TODO 监控价格更新
  185. \swoole_timer_tick(1000, function ($timer) use ($serv, &$time) { // 启用定时器,每3秒执行一次
  186. $date = date('Y-m-d H:i:s');
  187. if ($time > 7200 && file_exists($this->options['log_file'])) {
  188. $time = 0;
  189. file_put_contents($this->options['log_file'], "Task {$date}:清空日志\n");
  190. }
  191. $time++;
  192. $ccyList = CurrencyService::make()->getSyncList();
  193. if ($ccyList) {
  194. foreach ($ccyList as $item) {
  195. $ccyId = $item['id'];
  196. $ccy = $item['name'];
  197. if ($ccyId && !RedisService::get("caches:task:lock:currency_loaded_{$ccy}")) {
  198. $taskData = [
  199. 'taskName' => 'UpdateCurrencyPrice',
  200. 'name' => "更新同步币种实时价格",
  201. 'params' => $item,
  202. 'date' => date('Y-m-d'),
  203. ];
  204. $res = $serv->task($taskData);
  205. RedisService::set("caches:task:lock:currency_loaded_{$ccy}", $item, rand(2,3));
  206. echo "[Task UpdateCurrencyPrice {$date}] 更新[{$ccy}-USDT]最新价格:{$res}\n";
  207. }else {
  208. echo "[Task UpdateCurrencyPrice {$date}] 更新[{$ccy}-USDT]间隔时间调用\n";
  209. }
  210. }
  211. } else {
  212. echo "[Task UpdateCurrencyPrice {$date}] 暂无需更新价格币种\n";
  213. }
  214. });
  215. }
  216. //监听连接进入事件
  217. public function onConnect($serv, $fd, $from_id)
  218. {
  219. $serv->send($fd, "Success {$fd}!");
  220. }
  221. // 监听数据接收事件
  222. public function onReceive(\Swoole\Server $serv, $fd, $from_id, $data)
  223. {
  224. echo "Get Message From Client {$fd}:{$data}\n";
  225. $res['result'] = 'success';
  226. $serv->send($fd, json_encode($res)); // 同步返回消息给客户端
  227. $serv->task($data); // 执行异步任务
  228. }
  229. /**
  230. * @param \Swoole\Server $serv
  231. * @param $task_id
  232. * @param $from_id
  233. * @param $data
  234. * @return false|string
  235. */
  236. public function onTask(\Swoole\Server $serv, $task_id, $from_id, $data)
  237. {
  238. $date = date('Y-m-d H:i:s');
  239. $taskName = isset($data['taskName']) ? $data['taskName'] : '';
  240. $params = isset($data['params']) ? $data['params'] : [];
  241. try {
  242. switch ($taskName) {
  243. case 'UpdateCurrencyPrice': // 更新币种价格
  244. // 调用处理
  245. $ccy = isset($params['name'])? $params['name'] : 0;
  246. if(empty($ccy)){
  247. echo "[Task {$taskName}-{$task_id} {$date}] 币种参数错误\n";
  248. return false;
  249. }
  250. if ($res = TickerService::make()->updatePrice($ccy)) {
  251. $res = is_array($res) && $res ? json_encode($res, 256) : '价格同步成功';
  252. echo "[Task {$taskName}-{$task_id} {$date}] 币种[{$ccy}-USDT]最新价格更新处理结果:{$res}\n";
  253. } else {
  254. $error = TickerService::make()->getError();
  255. $error = $error ? lang($error) : '价格同步失败';
  256. echo "[Task {$taskName}-{$task_id} {$date}] 币种[{$ccy}-USDT]最新价格更新处理结果:{$error}\n";
  257. }
  258. break;
  259. }
  260. } catch (\Exception $exception) {
  261. return $exception->getMessage();
  262. }
  263. return '暂无任务处理';
  264. }
  265. /**
  266. * @param $serv swoole_server swoole_server对象
  267. * @param $task_id int 任务id
  268. * @param $data string 任务返回的数据
  269. */
  270. public function onFinish(\Swoole\Server $serv, $task_id, $data)
  271. {
  272. //
  273. echo "任务[{$task_id}]处理完成...\n";
  274. }
  275. // 监听连接关闭事件
  276. public function onClose($serv, $fd, $from_id)
  277. {
  278. echo "Client {$fd} close connection\n";
  279. $serv->close();
  280. }
  281. }