SwooleTask.php 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. <?php
  2. namespace App\Console\Commands;
  3. use App\Services\Api\MemberService;
  4. use App\Services\Api\OrderService;
  5. use App\Services\Api\PriceService;
  6. use App\Services\Api\SettleService;
  7. use App\Services\ConfigService;
  8. use App\Services\RedisService;
  9. use Illuminate\Console\Command;
  10. use Illuminate\Support\Facades\DB;
  11. class SwooleTask extends Command
  12. {
  13. protected $serv;
  14. protected $host = '127.0.0.1';
  15. protected $port = 6670;
  16. // 进程名称
  17. protected $taskName = 'swooleTask';
  18. // PID路径
  19. protected $pidPath = '/storage/swoole.pid';
  20. // task
  21. protected $onlyReloadTaskWorker = false;
  22. // 设置运行时参数
  23. protected $options = [
  24. 'worker_num' => 8, //worker进程数,一般设置为CPU数的1-4倍
  25. 'daemonize' => true, //启用守护进程
  26. 'log_file' => '/storage/logs/swoole-task.log', //指定swoole错误日志文件
  27. 'log_level' => 5, //日志级别 范围是0-5,0-DEBUG,1-TRACE,2-INFO,3-NOTICE,4-WARNING,5-ERROR
  28. 'dispatch_mode' => 1, //数据包分发策略,1-轮询模式
  29. 'task_worker_num' => 6, //task进程的数量
  30. 'task_ipc_mode' => 3, //使用消息队列通信,并设置为争抢模式
  31. ];
  32. /**
  33. * The name and signature of the console command.
  34. *
  35. * @var string
  36. */
  37. protected $signature = 'swoole:task {op}';
  38. /**
  39. * The console command description.
  40. *
  41. * @var string
  42. */
  43. protected $description = 'Swoole task server description';
  44. /**
  45. * Create a new command instance.
  46. *
  47. * @return void
  48. */
  49. public function __construct()
  50. {
  51. parent::__construct();
  52. }
  53. /**
  54. * 入口
  55. * Execute the console command.
  56. *
  57. * @return mixed
  58. */
  59. public function handle()
  60. {
  61. ini_set("default_socket_timeout", -1);
  62. // 项目根目录
  63. defined('ROOT_PATH') or define('ROOT_PATH', base_path());
  64. // 文件上传目录
  65. defined('ATTACHMENT_PATH') or define('ATTACHMENT_PATH', base_path('public/uploads'));
  66. // 图片上传目录
  67. defined('IMG_PATH') or define('IMG_PATH', base_path('public/uploads/images'));
  68. // 临时存放目录
  69. defined('UPLOAD_TEMP_PATH') or define('UPLOAD_TEMP_PATH', ATTACHMENT_PATH . "/temp");
  70. // 定义普通图片域名
  71. defined('IMG_URL') or define('IMG_URL', env('IMG_URL'));
  72. // 数据表前缀
  73. defined('DB_PREFIX') or define('DB_PREFIX', DB::connection()->getTablePrefix());
  74. $this->options['log_file'] = base_path() . $this->options['log_file'];
  75. $this->pidPath = base_path() . $this->pidPath;
  76. $op = $this->argument('op');
  77. switch ($op) {
  78. case 'status': // 状态
  79. $res = $this->status();
  80. echo $res ? $res : 0;
  81. break;
  82. case 'start': // 运行
  83. return $this->start();
  84. break;
  85. case 'reload': // 平滑重启
  86. return $this->reload();
  87. break;
  88. case 'stop': // 停止运行
  89. return $this->stop();
  90. break;
  91. default:
  92. exit("{$op} command does not exist");
  93. break;
  94. }
  95. }
  96. /**
  97. * 启动
  98. */
  99. public function start()
  100. {
  101. date_default_timezone_set('PRC');
  102. // 构建Server对象,监听对应地址
  103. $this->serv = new \Swoole\Server($this->host, env('SWOOLE_PORT', $this->port));
  104. $this->serv->set($this->options);
  105. // 注册事件
  106. $this->serv->on('start', [$this, 'onStart']);
  107. $this->serv->on('receive', [$this, 'onReceive']);
  108. $this->serv->on('task', [$this, 'onTask']);
  109. $this->serv->on('finish', [$this, 'onFinish']);
  110. // Run worker
  111. echo "swoole start...\n";
  112. $this->serv->start();
  113. }
  114. // 安全重启
  115. public function reload()
  116. {
  117. $pids = file_exists($this->pidPath) ? file_get_contents($this->pidPath) : '';
  118. $pids = $pids ? explode("\n", $pids) : [];
  119. $masterPid = isset($pids[0]) ? $pids[0] : '';
  120. $managePid = isset($pids[1]) ? $pids[1] : '';
  121. if (empty($masterPid)) {
  122. return false;
  123. }
  124. if (!$this->status($masterPid)) {
  125. return false;
  126. }
  127. \Swoole\Process::kill($managePid, SIGUSR1);
  128. echo "swoole reload...\n";
  129. }
  130. /**
  131. * 停止
  132. * @param bool $smooth
  133. * @return bool
  134. */
  135. public function stop($smooth = false)
  136. {
  137. $pids = file_exists($this->pidPath) ? file_get_contents($this->pidPath) : '';
  138. $pids = $pids ? explode("\n", $pids) : [];
  139. $masterPid = isset($pids[0]) ? $pids[0] : '';
  140. if (empty($masterPid)) {
  141. return false;
  142. }
  143. if (!$this->status($masterPid)) {
  144. return false;
  145. }
  146. // 直接杀
  147. $stoSh = base_path().'/crontab/swooleTaskStop.sh';
  148. echo $stoSh;
  149. if(file_exists($stoSh) && function_exists('exec')){
  150. exec("{$stoSh}");
  151. }
  152. @unlink($this->pidPath);
  153. echo "swoole stop...\n";
  154. }
  155. /**
  156. * 状态
  157. * @return mixed
  158. */
  159. public function status($masterPid = 0)
  160. {
  161. $res = false;
  162. if (empty($masterPid) && file_exists($this->pidPath)) {
  163. $pids = file_get_contents($this->pidPath);
  164. $pids = $pids ? explode("\n", $pids) : [];
  165. $masterPid = isset($pids[0]) ? $pids[0] : '';
  166. }
  167. if ($masterPid) {
  168. $res = \Swoole\Process::kill($masterPid, 0);
  169. }
  170. return $res;
  171. }
  172. public function onStart($serv)
  173. {
  174. if (!is_dir(dirname($this->pidPath))) {
  175. @mkdir(dirname($this->pidPath), true, 755);
  176. }
  177. //记录进程id,脚本实现自动重启
  178. $pid = "{$serv->master_pid}\n{$serv->manager_pid}";
  179. file_put_contents($this->pidPath, $pid);
  180. // 定时任务
  181. $time = 0;
  182. $date = date('Y-m-d H:i:s');
  183. if(file_exists($this->options['log_file'])){
  184. $time = 0;
  185. file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n");
  186. }
  187. // 清除日志
  188. \swoole_timer_tick(1000, function ($timer) use ($serv, &$time) { // 启用定时器,每5秒执行一次
  189. $date = date('Y-m-d H:i:s');
  190. if($time>7200 && file_exists($this->options['log_file'])){
  191. $time = 0;
  192. file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n");
  193. }
  194. $time++;
  195. });
  196. // 订单自动收货
  197. \swoole_timer_tick(10000, function ($timer) use ($serv, &$time) { // 启用定时器,每10秒执行一次
  198. $date = date('Y-m-d H:i:s');
  199. $orders = OrderService::make()->getCompleteOrders();
  200. if($orders){
  201. foreach ($orders as $k => $item){
  202. $orderNo = isset($item['order_no'])?$item['order_no'] : '';
  203. if(!RedisService::get('caches:task:lock:order_complete_loaded_'.$orderNo)){
  204. $taskData = [
  205. 'taskName' => 'completeOrder',
  206. 'name' => "购物订单自动收货处理",
  207. 'date' => date('Y-m-d'),
  208. 'params'=> $item,
  209. ];
  210. $res = $serv->task($taskData);
  211. RedisService::set('caches:task:lock:order_complete_loaded_'.$orderNo, true, rand(3,5));
  212. echo "[Task completeOrder {$date}] 购物订单【{$orderNo}】自动收货处理结果:{$res}\n";
  213. }else{
  214. echo "[Task completeOrder {$date}] 购物订单【{$orderNo}】自动收货处理间隔时间调用\n";
  215. }
  216. }
  217. }else{
  218. echo "[Task completeOrder {$date}] 暂无可自动收货的购物订单\n";
  219. }
  220. });
  221. // 订单收益补充结算
  222. \swoole_timer_tick(20000, function ($timer) use ($serv, &$time) { // 启用定时器,每20秒执行一次
  223. $date = date('Y-m-d H:i:s');
  224. $orders = OrderService::make()->getSettleOrders();
  225. if($orders){
  226. foreach ($orders as $k => $item){
  227. $orderNo = isset($item['order_no'])?$item['order_no'] : '';
  228. if(!RedisService::get('caches:task:lock:order_settle_loaded_'.$orderNo)){
  229. $taskData = [
  230. 'taskName' => 'settleOrder',
  231. 'name' => "订单【{$orderNo}】收益自动结算处理",
  232. 'date' => date('Y-m-d'),
  233. 'params'=> $item,
  234. ];
  235. $res = $serv->task($taskData);
  236. RedisService::set('caches:task:lock:order_settle_loaded_'.$orderNo, true, rand(3,5));
  237. echo "[Task settleOrder {$date}] 订单【{$orderNo}】收益自动结算处理结果:{$res}\n";
  238. }else{
  239. echo "[Task settleOrder {$date}] 订单【{$orderNo}】收益自动结算处理间隔时间调用\n";
  240. }
  241. }
  242. }else{
  243. echo "[Task settleOrder {$date}] 暂无可结算收益的订单\n";
  244. }
  245. });
  246. // 用户自动复购
  247. \swoole_timer_tick(120000, function ($timer) use ($serv, &$time) { // 启用定时器,每2分钟执行一次
  248. $date = date('Y-m-d H:i:s');
  249. $users = MemberService::make()->getReplyBuyUsers();
  250. if($users){
  251. foreach ($users as $k => $item){
  252. $nickname = isset($item['nickname'])?$item['nickname'] : '';
  253. $userId = isset($item['id'])?$item['id'] : '';
  254. if(!RedisService::get('caches:task:lock:user_reply_buy_loaded_'.$userId)){
  255. $taskData = [
  256. 'taskName' => 'rebuyOrder',
  257. 'name' => "用户【{$nickname}-{$userId}】自动复购",
  258. 'date' => date('Y-m-d'),
  259. 'params'=> $item,
  260. ];
  261. $res = $serv->task($taskData);
  262. RedisService::set('caches:task:lock:user_reply_buy_loaded_'.$userId, true, rand(3,5));
  263. echo "[Task rebuyOrder {$date}] 用户【{$nickname}-{$userId}】自动复购处理结果:{$res}\n";
  264. }else{
  265. echo "[Task rebuyOrder {$date}] 用户【{$nickname}-{$userId}】自动复购间隔时间调用\n";
  266. }
  267. }
  268. }else{
  269. echo "[Task rebuyOrder {$date}] 暂无需要自动复购的用户\n";
  270. }
  271. });
  272. // 更新资产价格
  273. \swoole_timer_tick(5000, function ($timer) use ($serv, &$time) { // 启用定时器,每5秒执行一次
  274. $date = date('Y-m-d H:i:s');
  275. $updatePriceTime = ConfigService::make()->getConfigByCode('update_price_time','');
  276. $updatePriceTime = $updatePriceTime? $updatePriceTime : '00:00:00';
  277. $updatePriceTime = strtotime(date("Y-m-d {$updatePriceTime}"));
  278. $updatePriceDate = date("Y-m-d H:i",$updatePriceTime);
  279. $updatePriceEndDate = date("Y-m-d H:i",$updatePriceTime+300);
  280. $currentTime = time();
  281. if($currentTime < $updatePriceTime || $currentTime > $updatePriceTime + 300){
  282. echo "[Task updatePrice {$date}] 未在更新时间[{$updatePriceDate}~{$updatePriceEndDate}]范围内\n";
  283. return false;
  284. }
  285. $taskData = [
  286. 'taskName' => 'updatePrice',
  287. 'name' => "更新每日数字资产价格",
  288. 'date' => date('Y-m-d'),
  289. ];
  290. $res = $serv->task($taskData);
  291. echo "[Task updatePrice {$date}] 更新每日数字资产价格处理结果:{$res}\n";
  292. });
  293. }
  294. //监听连接进入事件
  295. public function onConnect($serv, $fd, $from_id)
  296. {
  297. $serv->send($fd, "Success {$fd}!");
  298. }
  299. // 监听数据接收事件
  300. public function onReceive(\Swoole\Server $serv, $fd, $from_id, $data)
  301. {
  302. echo "Get Message From Client {$fd}:{$data}\n";
  303. $res['result'] = 'success';
  304. $serv->send($fd, json_encode($res)); // 同步返回消息给客户端
  305. $serv->task($data); // 执行异步任务
  306. }
  307. /**
  308. * @param \Swoole\Server $serv
  309. * @param $task_id
  310. * @param $from_id
  311. * @param $data
  312. * @return false|string
  313. */
  314. public function onTask(\Swoole\Server $serv, $task_id, $from_id, $data)
  315. {
  316. $date = date('Y-m-d H:i:s');
  317. $taskName = isset($data['taskName']) ? $data['taskName'] : '';
  318. $params = isset($data['params']) ? $data['params'] : [];
  319. try {
  320. switch ($taskName) {
  321. case 'completeOrder': // 自动收货
  322. $orderId = isset($params['id'])? $params['id'] : 0;
  323. $orderNo = isset($params['order_no'])? $params['order_no'] : '';
  324. $userId = isset($params['user_id'])? $params['user_id'] : 0;
  325. if($orderId<=0 || $userId<=0){
  326. echo "[Task {$taskName} {$date}][{$task_id}] 该购物订单参数错误\n";
  327. return false;
  328. }
  329. // 调用处理
  330. if($res = OrderService::make()->complete($userId, $orderId, false)){
  331. $res = is_array($res) && $res? json_encode($res, 256) : 'success';
  332. echo "[Task {$taskName} {$date}][{$task_id}] 购物订单【{$orderNo}】自动收货处理结果:{$res}\n";
  333. }else{
  334. $error = OrderService::make()->getError();
  335. $error = $error? lang($error) : 'failed';
  336. echo "[Task {$taskName} {$date}][{$task_id}] 购物订单【{$orderNo}】自动收货处理结果:{$error}\n";
  337. }
  338. break;
  339. case 'settleOrder': // 补充结算订单
  340. $orderId = isset($params['id'])? $params['id'] : 0;
  341. $orderNo = isset($params['order_no'])? $params['order_no'] : '';
  342. $userId = isset($params['user_id'])? $params['user_id'] : 0;
  343. if($orderId<=0 || $userId<=0){
  344. echo "[Task {$taskName} {$date}][{$task_id}] 该订单参数错误\n";
  345. return false;
  346. }
  347. // 调用处理
  348. if($res = SettleService::make()->commissionSettle($orderId)){
  349. $res = is_array($res) && $res? json_encode($res, 256) : 'success';
  350. echo "[Task {$taskName} {$date}][{$task_id}] 礼包订单【{$orderNo}】结算处理结果:{$res}\n";
  351. }else{
  352. $error = SettleService::make()->getError();
  353. $error = $error? lang($error) : 'failed';
  354. echo "[Task {$taskName} {$date}][{$task_id}] 礼包订单【{$orderNo}】结算处理结果:{$error}\n";
  355. }
  356. break;
  357. case 'rebuyOrder': // 自动复购
  358. $userId = isset($params['id'])? $params['id'] : 0;
  359. $nickname = isset($params['nickname'])? $params['nickname'] : '';
  360. $buyType = isset($params['buy_type'])? $params['buy_type'] : 0;
  361. if($userId<=0 || $buyType<=0){
  362. echo "[Task {$taskName} {$date}][{$task_id}] 该用户【{$nickname}】复购参数错误\n";
  363. return false;
  364. }
  365. // 调用处理
  366. if($res = OrderService::make()->rebuyOrder($userId, $buyType)){
  367. $res = is_array($res) && $res? json_encode($res, 256) : 'success';
  368. echo "[Task {$taskName} {$date}][{$task_id}] 用户【{$nickname}-{$userId}】自动复购处理结果:{$res}\n";
  369. }else{
  370. $error = OrderService::make()->getError();
  371. $error = $error? lang($error) : 'failed';
  372. echo "[Task {$taskName} {$date}][{$task_id}] 用户【{$nickname}-{$userId}】自动复购处理结果:{$error}\n";
  373. }
  374. break;
  375. case 'updatePrice': // 更新每日数字资产价格
  376. // 调用处理
  377. if($res = PriceService::make()->updatePrice(1)){
  378. $res = is_array($res) && $res? json_encode($res, 256) : 'success';
  379. echo "[Task {$taskName} {$date}][{$task_id}] 更新每日数字资产价格处理结果:{$res}\n";
  380. }else{
  381. $error = PriceService::make()->getError();
  382. $error = $error? lang($error) : 'failed';
  383. echo "[Task {$taskName} {$date}][{$task_id}] 更新每日数字资产价格处理结果:{$error}\n";
  384. }
  385. break;
  386. }
  387. } catch(\Exception $exception){
  388. return $exception->getMessage();
  389. }
  390. return "[{$task_id}]暂无任务处理";
  391. }
  392. /**
  393. * @param $serv swoole_server swoole_server对象
  394. * @param $task_id int 任务id
  395. * @param $data string 任务返回的数据
  396. */
  397. public function onFinish(\Swoole\Server $serv, $task_id, $data)
  398. {
  399. //
  400. //echo "任务处理完成...\n";
  401. }
  402. // 监听连接关闭事件
  403. public function onClose($serv, $fd, $from_id)
  404. {
  405. echo "Client {$fd} close connection\n";
  406. $serv->close();
  407. }
  408. }