WalletTask.php 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. <?php
  2. namespace App\Console\Commands;
  3. use App\Services\RedisService;
  4. use App\Services\WalletService;
  5. use Illuminate\Console\Command;
  6. use Illuminate\Support\Facades\DB;
  7. class WalletTask extends Command
  8. {
  9. protected $serv;
  10. protected $host = '127.0.0.1';
  11. protected $port = 6626;
  12. // 进程名称
  13. protected $taskName = 'walletTask';
  14. // PID路径
  15. protected $pidPath = '/storage/wallet.pid';
  16. // task
  17. protected $onlyReloadTaskWorker = false;
  18. // 设置运行时参数
  19. protected $options = [
  20. 'worker_num' => 8, //worker进程数,一般设置为CPU数的1-4倍
  21. 'daemonize' => true, //启用守护进程
  22. 'log_file' => '/storage/logs/wallet-task.log', //指定swoole错误日志文件
  23. 'log_level' => 0, //日志级别 范围是0-5,0-DEBUG,1-TRACE,2-INFO,3-NOTICE,4-WARNING,5-ERROR
  24. 'dispatch_mode' => 1, //数据包分发策略,1-轮询模式
  25. 'task_worker_num' => 6, //task进程的数量
  26. 'task_ipc_mode' => 3, //使用消息队列通信,并设置为争抢模式
  27. ];
  28. /**
  29. * The name and signature of the console command.
  30. *
  31. * @var string
  32. */
  33. protected $signature = 'wallet:task {op}';
  34. /**
  35. * The console command description.
  36. *
  37. * @var string
  38. */
  39. protected $description = 'Wallet task server description';
  40. /**
  41. * Create a new command instance.
  42. *
  43. * @return void
  44. */
  45. public function __construct()
  46. {
  47. parent::__construct();
  48. }
  49. /**
  50. * 入口
  51. * Execute the console command.
  52. *
  53. * @return mixed
  54. */
  55. public function handle()
  56. {
  57. ini_set("default_socket_timeout", -1);
  58. // 项目根目录
  59. defined('ROOT_PATH') or define('ROOT_PATH', base_path());
  60. // 文件上传目录
  61. defined('ATTACHMENT_PATH') or define('ATTACHMENT_PATH', base_path('public/uploads'));
  62. // 图片上传目录
  63. defined('IMG_PATH') or define('IMG_PATH', base_path('public/uploads/images'));
  64. // 临时存放目录
  65. defined('UPLOAD_TEMP_PATH') or define('UPLOAD_TEMP_PATH', ATTACHMENT_PATH . "/temp");
  66. // 定义普通图片域名
  67. defined('IMG_URL') or define('IMG_URL', env('IMG_URL'));
  68. // 数据表前缀
  69. defined('DB_PREFIX') or define('DB_PREFIX', DB::connection()->getTablePrefix());
  70. $this->options['log_file'] = base_path() . $this->options['log_file'];
  71. $this->pidPath = base_path() . $this->pidPath;
  72. $op = $this->argument('op');
  73. switch ($op) {
  74. case 'status': // 状态
  75. $res = $this->status();
  76. echo $res ? $res : 0;
  77. break;
  78. case 'start': // 运行
  79. return $this->start();
  80. break;
  81. case 'reload': // 平滑重启
  82. return $this->reload();
  83. break;
  84. case 'stop': // 停止运行
  85. return $this->stop();
  86. break;
  87. default:
  88. exit("{$op} command does not exist");
  89. break;
  90. }
  91. }
  92. /**
  93. * 启动
  94. */
  95. public function start()
  96. {
  97. date_default_timezone_set('PRC');
  98. // 构建Server对象,监听对应地址
  99. $this->serv = new \Swoole\Server($this->host, $this->port);
  100. $this->serv->set($this->options);
  101. // 注册事件
  102. $this->serv->on('start', [$this, 'onStart']);
  103. $this->serv->on('receive', [$this, 'onReceive']);
  104. $this->serv->on('task', [$this, 'onTask']);
  105. $this->serv->on('finish', [$this, 'onFinish']);
  106. // Run worker
  107. echo "wallet start...\n";
  108. $this->serv->start();
  109. }
  110. // 安全重启
  111. public function reload()
  112. {
  113. $pids = file_exists($this->pidPath) ? file_get_contents($this->pidPath) : '';
  114. $pids = $pids ? explode("\n", $pids) : [];
  115. $masterPid = isset($pids[0]) ? $pids[0] : '';
  116. $managePid = isset($pids[1]) ? $pids[1] : '';
  117. if (empty($masterPid)) {
  118. return false;
  119. }
  120. if (!$this->status($masterPid)) {
  121. return false;
  122. }
  123. \Swoole\Process::kill($managePid, SIGUSR1);
  124. echo "wallet reload...\n";
  125. }
  126. /**
  127. * 停止
  128. * @param bool $smooth
  129. * @return bool
  130. */
  131. public function stop($smooth = false)
  132. {
  133. $pids = file_exists($this->pidPath) ? file_get_contents($this->pidPath) : '';
  134. $pids = $pids ? explode("\n", $pids) : [];
  135. $masterPid = isset($pids[0]) ? $pids[0] : '';
  136. $managePid = isset($pids[1]) ? $pids[1] : '';
  137. if (empty($masterPid)) {
  138. return false;
  139. }
  140. if (!$this->status($masterPid)) {
  141. return false;
  142. }
  143. // if ($smooth) {
  144. // \Swoole\Process::kill($masterPid, SIGTERM);
  145. // try {
  146. // while (true) {
  147. // \Swoole\Process::kill($masterPid, 0);
  148. // }
  149. // } catch (\Exception $exception) {
  150. //
  151. // }
  152. // } else {
  153. // \Swoole\Process::kill($masterPid, SIGKILL);
  154. // }
  155. //
  156. // if($managePid){
  157. // \Swoole\Process::kill($managePid, SIGKILL);
  158. // }
  159. // 直接杀
  160. $stoSh = base_path().'/crontab/walletTaskStop.sh';
  161. echo $stoSh;
  162. if(file_exists($stoSh) && function_exists('exec')){
  163. exec("{$stoSh}");
  164. }
  165. @unlink($this->pidPath);
  166. echo "wallet stop...\n";
  167. }
  168. /**
  169. * 状态
  170. * @return mixed
  171. */
  172. public function status($masterPid = 0)
  173. {
  174. $res = false;
  175. if (empty($masterPid) && file_exists($this->pidPath)) {
  176. $pids = file_get_contents($this->pidPath);
  177. $pids = $pids ? explode("\n", $pids) : [];
  178. $masterPid = isset($pids[0]) ? $pids[0] : '';
  179. }
  180. if ($masterPid) {
  181. $res = \Swoole\Process::kill($masterPid, 0);
  182. }
  183. return $res;
  184. }
  185. public function onStart($serv)
  186. {
  187. if (!is_dir(dirname($this->pidPath))) {
  188. @mkdir(dirname($this->pidPath), true, 755);
  189. }
  190. //记录进程id,脚本实现自动重启
  191. $pid = "{$serv->master_pid}\n{$serv->manager_pid}";
  192. file_put_contents($this->pidPath, $pid);
  193. // 定时任务
  194. $time = 0;
  195. $date = date('Y-m-d H:i:s');
  196. if(file_exists($this->options['log_file'])){
  197. $time = 0;
  198. file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n");
  199. }
  200. \swoole_timer_tick(20000, function ($timer) use ($serv, &$time) { // 启用定时器,每20秒执行一次
  201. $date = date('Y-m-d H:i:s');
  202. if($time>7200 && file_exists($this->options['log_file'])){
  203. $time = 0;
  204. file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n");
  205. }
  206. $time++;
  207. $wallets = WalletService::make()->getWalletList();
  208. if($wallets){
  209. if(!RedisService::get('caches:task:lock:wallet_loaded')){
  210. foreach ($wallets as $item){
  211. $taskData = [
  212. 'taskName' => 'listenWallet',
  213. 'name' => "平台钱包地址监控",
  214. 'params' => $item,
  215. 'date' => date('Y-m-d'),
  216. ];
  217. $res = $serv->task($taskData);
  218. }
  219. RedisService::set('caches:task:lock:wallet_loaded', true, rand(3,5));
  220. echo "[Task listenWallet {$date}] 平台钱包地址监控调用完成:{$res}\n";
  221. }else{
  222. echo "[Task listenWallet {$date}] 间隔时间调用\n";
  223. }
  224. }else{
  225. echo "[Task listenWallet {$date}] 钱包地址未配置\n";
  226. }
  227. });
  228. }
  229. //监听连接进入事件
  230. public function onConnect($serv, $fd, $from_id)
  231. {
  232. $serv->send($fd, "Success {$fd}!");
  233. }
  234. // 监听数据接收事件
  235. public function onReceive(\Swoole\Server $serv, $fd, $from_id, $data)
  236. {
  237. echo "Get Message From Client {$fd}:{$data}\n";
  238. $res['result'] = 'success';
  239. $serv->send($fd, json_encode($res)); // 同步返回消息给客户端
  240. $serv->task($data); // 执行异步任务
  241. }
  242. /**
  243. * @param \Swoole\Server $serv
  244. * @param $task_id
  245. * @param $from_id
  246. * @param $data
  247. * @return false|string
  248. */
  249. public function onTask(\Swoole\Server $serv, $task_id, $from_id, $data)
  250. {
  251. $date = date('Y-m-d H:i:s');
  252. $taskName = isset($data['taskName']) ? $data['taskName'] : '';
  253. $params = isset($data['params']) ? $data['params'] : [];
  254. switch ($taskName) {
  255. case 'listenWallet': // 钱包监控
  256. $address = isset($params['address'])? $params['address'] : '';
  257. $type = isset($params['type'])? $params['type'] : 1;
  258. if(empty($address)){
  259. echo "[Task {$taskName} {$date}] address 地址参数错误\n";
  260. return false;
  261. }
  262. // 调用处理
  263. if($res = WalletService::make()->listenTrcWallet($address, $type)){
  264. $success = isset($res['success'])? $res['success'] : 0;
  265. $total = isset($res['total'])? $res['total'] : 0;
  266. echo "[Task {$taskName} {$date}] 钱包地址监控处理结果:".($total? "监控{$total}条记录,成功处理{$success}条":'success')."\n";
  267. }else{
  268. $error = WalletService::make()->getError();
  269. $error = $error? lang($error) : 'failed';
  270. echo "[Task {$taskName} {$date}] 钱包地址监控处理结果:{$error}\n";
  271. }
  272. break;
  273. }
  274. return "[Task {$taskName} {$date}] 暂无任务处理\n";
  275. }
  276. /**
  277. * @param $serv swoole_server swoole_server对象
  278. * @param $task_id int 任务id
  279. * @param $data string 任务返回的数据
  280. */
  281. public function onFinish(\Swoole\Server $serv, $task_id, $data)
  282. {
  283. //
  284. echo "任务处理完成...\n";
  285. }
  286. // 监听连接关闭事件
  287. public function onClose($serv, $fd, $from_id)
  288. {
  289. echo "Client {$fd} close connection\n";
  290. $serv->close();
  291. }
  292. }