8, //worker进程数,一般设置为CPU数的1-4倍 'daemonize' => true, //启用守护进程 'log_file' => '/storage/logs/swoole-task.log', //指定swoole错误日志文件 'log_level' => 0, //日志级别 范围是0-5,0-DEBUG,1-TRACE,2-INFO,3-NOTICE,4-WARNING,5-ERROR 'dispatch_mode' => 1, //数据包分发策略,1-轮询模式 'task_worker_num' => 6, //task进程的数量 'task_ipc_mode' => 3, //使用消息队列通信,并设置为争抢模式 ]; /** * The name and signature of the console command. * * @var string */ protected $signature = 'swoole:task {op}'; /** * The console command description. * * @var string */ protected $description = 'Swoole task server description'; /** * Create a new command instance. * * @return void */ public function __construct() { parent::__construct(); } /** * 入口 * Execute the console command. * * @return mixed */ public function handle() { ini_set("default_socket_timeout", -1); // 项目根目录 defined('ROOT_PATH') or define('ROOT_PATH', base_path()); // 文件上传目录 defined('ATTACHMENT_PATH') or define('ATTACHMENT_PATH', base_path('public/uploads')); // 图片上传目录 defined('IMG_PATH') or define('IMG_PATH', base_path('public/uploads/images')); // 临时存放目录 defined('UPLOAD_TEMP_PATH') or define('UPLOAD_TEMP_PATH', ATTACHMENT_PATH . "/temp"); // 定义普通图片域名 defined('IMG_URL') or define('IMG_URL', env('IMG_URL')); // 数据表前缀 defined('DB_PREFIX') or define('DB_PREFIX', DB::connection()->getTablePrefix()); $this->options['log_file'] = base_path() . $this->options['log_file']; $this->pidPath = base_path() . $this->pidPath; $op = $this->argument('op'); switch ($op) { case 'status': // 状态 $res = $this->status(); echo $res ? $res : 0; break; case 'start': // 运行 return $this->start(); break; case 'reload': // 平滑重启 return $this->reload(); break; case 'stop': // 停止运行 return $this->stop(); break; default: exit("{$op} command does not exist"); break; } } /** * 启动 */ public function start() { date_default_timezone_set('PRC'); // 构建Server对象,监听对应地址 $this->serv = new \Swoole\Server($this->host, $this->port); $this->serv->set($this->options); // 注册事件 $this->serv->on('start', [$this, 'onStart']); $this->serv->on('receive', [$this, 'onReceive']); $this->serv->on('task', [$this, 'onTask']); $this->serv->on('finish', [$this, 'onFinish']); // Run worker echo "swoole start...\n"; $this->serv->start(); } // 安全重启 public function reload() { $pids = file_exists($this->pidPath) ? file_get_contents($this->pidPath) : ''; $pids = $pids ? explode("\n", $pids) : []; $masterPid = isset($pids[0]) ? $pids[0] : ''; $managePid = isset($pids[1]) ? $pids[1] : ''; if (empty($masterPid)) { return false; } if (!$this->status($masterPid)) { return false; } \Swoole\Process::kill($managePid, SIGUSR1); echo "swoole reload...\n"; } /** * 停止 * @param bool $smooth * @return bool */ public function stop($smooth = false) { $pids = file_exists($this->pidPath) ? file_get_contents($this->pidPath) : ''; $pids = $pids ? explode("\n", $pids) : []; $masterPid = isset($pids[0]) ? $pids[0] : ''; $managePid = isset($pids[1]) ? $pids[1] : ''; if (empty($masterPid)) { return false; } if (!$this->status($masterPid)) { return false; } // if ($smooth) { // \Swoole\Process::kill($masterPid, SIGTERM); // try { // while (true) { // \Swoole\Process::kill($masterPid, 0); // } // } catch (\Exception $exception) { // // } // } else { // \Swoole\Process::kill($masterPid, SIGKILL); // } // // if($managePid){ // \Swoole\Process::kill($managePid, SIGKILL); // } // 直接杀 $stoSh = base_path().'/crontab/swooleTaskStop.sh'; echo $stoSh; if(file_exists($stoSh) && function_exists('exec')){ exec("{$stoSh}"); } @unlink($this->pidPath); echo "swoole stop...\n"; } /** * 状态 * @return mixed */ public function status($masterPid = 0) { $res = false; if (empty($masterPid) && file_exists($this->pidPath)) { $pids = file_get_contents($this->pidPath); $pids = $pids ? explode("\n", $pids) : []; $masterPid = isset($pids[0]) ? $pids[0] : ''; } if ($masterPid) { $res = \Swoole\Process::kill($masterPid, 0); } return $res; } public function onStart($serv) { if (!is_dir(dirname($this->pidPath))) { @mkdir(dirname($this->pidPath), true, 755); } //记录进程id,脚本实现自动重启 $pid = "{$serv->master_pid}\n{$serv->manager_pid}"; file_put_contents($this->pidPath, $pid); // 定时任务 $time = 0; $date = date('Y-m-d H:i:s'); if(file_exists($this->options['log_file'])){ $time = 0; file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n"); } // TODO 更新USDT价格 \swoole_timer_tick(30000, function ($timer) use ($serv, &$time) { // 启用定时器,每120秒执行一次 $date = date('Y-m-d H:i:s'); if($time>3600 && file_exists($this->options['log_file'])){ $time = 0; file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n"); } $time++; if(!RedisService::get('caches:task:lock:usdt_loaded')){ $taskData = [ 'taskName' => 'UpdateUsdtPrice', 'name' => "更新USDT价格", 'date' => date('Y-m-d'), ]; $res = $serv->task($taskData); RedisService::set('caches:task:lock:usdt_loaded', true, rand(3,5)); echo "[Task UpdateUsdtPrice {$date}] 更新USDT价格:{$res}\n"; }else{ echo "[Task UpdateUsdtPrice {$date}] 间隔时间调用\n"; } }); // TODO 更新商品数据 \swoole_timer_tick(120000, function ($timer) use ($serv, &$time) { // 启用定时器,每120秒执行一次 $date = date('Y-m-d H:i:s'); if($time>3600 && file_exists($this->options['log_file'])){ $time = 0; file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n"); } $time++; if(!RedisService::get('caches:task:lock:goods_loaded')){ $taskData = [ 'taskName' => 'UpdateGoods', 'name' => "更新商品数据", 'date' => date('Y-m-d'), ]; $res = $serv->task($taskData); RedisService::set('caches:task:lock:goods_loaded', true, rand(3,5)); echo "[Task UpdateGoods {$date}] 更新商品数据:{$res}\n"; }else{ echo "[Task UpdateGoods {$date}] 间隔时间调用\n"; } }); // TODO 更新商品分类 \swoole_timer_tick(120000, function ($timer) use ($serv, &$time) { // 启用定时器,每120秒执行一次 $date = date('Y-m-d H:i:s'); if($time>3600 && file_exists($this->options['log_file'])){ $time = 0; file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n"); } $time++; if(!RedisService::get('caches:task:lock:goods_category_loaded')){ $taskData = [ 'taskName' => 'UpdateGoodsCategory', 'name' => "更新商品分类数据", 'date' => date('Y-m-d'), ]; $res = $serv->task($taskData); RedisService::set('caches:task:lock:goods_category_loaded', true, rand(3,5)); echo "[Task UpdateGoodsCategory {$date}] 更新商品分类数据:{$res}\n"; }else{ echo "[Task UpdateGoodsCategory {$date}] 间隔时间调用\n"; } }); // TODO 更新商品分类 \swoole_timer_tick(120000, function ($timer) use ($serv, &$time) { // 启用定时器,每120秒执行一次 $date = date('Y-m-d H:i:s'); if($time>3600 && file_exists($this->options['log_file'])){ $time = 0; file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n"); } $time++; $cateIds = GoodsCategoryService::make()->getCateIds(); if($cateIds){ foreach($cateIds as $item){ $pid = isset($item['cate_id'])? $item['cate_id'] : 0; if($pid && !RedisService::get("caches:task:lock:goods_category_sub_loaded_{$pid}")){ $taskData = [ 'taskName' => 'UpdateGoodsCategorySub', 'name' => "更新商品分类【{$pid}】的子分类数据", 'pid'=> $pid, 'date' => date('Y-m-d'), ]; $res = $serv->task($taskData); RedisService::set("caches:task:lock:goods_category_sub_loaded_{$pid}", true, rand(3,5)); echo "[Task UpdateGoodsCategorySub {$date}] 更新商品分类【{$pid}】的子分类数据:{$res}\n"; }else{ echo "[Task UpdateGoodsCategorySub {$date}] 间隔时间调用\n"; } } }else{ echo "[Task UpdateGoodsCategorySub {$date}] 没有父级数据\n"; } }); // TODO 更新商品SKU数据 \swoole_timer_tick(120000, function ($timer) use ($serv, &$time) { // 启用定时器,每120秒执行一次 $date = date('Y-m-d H:i:s'); if($time>3600 && file_exists($this->options['log_file'])){ $time = 0; file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n"); } $time++; if(!RedisService::get('caches:task:lock:goods_sku_loaded')){ $taskData = [ 'taskName' => 'UpdateGoodsSku', 'name' => "更新商品SKU数据", 'date' => date('Y-m-d'), ]; $res = $serv->task($taskData); RedisService::set('caches:task:lock:goods_sku_loaded', true, rand(3,5)); echo "[Task UpdateGoodsSku {$date}] 更新商品SKU数据:{$res}\n"; }else{ echo "[Task UpdateGoodsSku {$date}] 间隔时间调用\n"; } }); // TODO 发放积分 \swoole_timer_tick(180000, function ($timer) use ($serv, &$time) { // 启用定时器,每180秒执行一次 $date = date('Y-m-d H:i:s'); if($time>3600 && file_exists($this->options['log_file'])){ $time = 0; file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n"); } $time++; if(!RedisService::get('caches:task:lock:grant_score_loaded')){ $taskData = [ 'taskName' => 'GrantScore', 'name' => "每日发放积分", 'date' => date('Y-m-d'), ]; $res = $serv->task($taskData); RedisService::set('caches:task:lock:grant_score_loaded', true, rand(3,5)); echo "[Task GrantScore {$date}] 每日发放积分:{$res}\n"; }else{ echo "[Task GrantScore {$date}] 间隔时间调用\n"; } }); // TODO 待返积分返还 \swoole_timer_tick(120000, function ($timer) use ($serv, &$time) { // 启用定时器,每120秒执行一次 $date = date('Y-m-d H:i:s'); if($time>3600 && file_exists($this->options['log_file'])){ $time = 0; file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n"); } $time++; if(!RedisService::get('caches:task:lock:wait_score_loaded')){ $taskData = [ 'taskName' => 'ReturnWaitScore', 'name' => "待返积分每日返还", 'date' => date('Y-m-d'), ]; $res = $serv->task($taskData); RedisService::set('caches:task:lock:wait_score_loaded', true, rand(3,5)); echo "[Task ReturnWaitScore {$date}] 待返积分每日返还:{$res}\n"; }else{ echo "[Task ReturnWaitScore {$date}] 间隔时间调用\n"; } }); // TODO 全球分红结算 \swoole_timer_tick(120000, function ($timer) use ($serv, &$time) { // 启用定时器,每120秒执行一次 $date = date('Y-m-d H:i:s'); if($time>3600 && file_exists($this->options['log_file'])){ $time = 0; file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n"); } $time++; if(!RedisService::get('caches:task:lock:global_loaded')){ $taskData = [ 'taskName' => 'GlobalBonus', 'name' => "待返积分每日返还", 'date' => date('Y-m-d'), ]; $res = $serv->task($taskData); RedisService::set('caches:task:lock:global_loaded', true, rand(3,5)); echo "[Task GlobalBonus {$date}] 全球分红结算:{$res}\n"; }else{ echo "[Task GlobalBonus {$date}] 间隔时间调用\n"; } }); // TODO 更新订单状态 \swoole_timer_tick(300000, function ($timer) use ($serv, &$time) { // 启用定时器,每300秒执行一次 $date = date('Y-m-d H:i:s'); if($time>3600 && file_exists($this->options['log_file'])){ $time = 0; file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n"); } $time++; if(!RedisService::get('caches:task:lock:order_loaded')){ $taskData = [ 'taskName' => 'UpdateOrderStatus', 'name' => "更新订单状态", 'date' => date('Y-m-d'), ]; $res = $serv->task($taskData); RedisService::set('caches:task:lock:order_loaded', true, rand(3,5)); echo "[Task UpdateOrderStatus {$date}] 更新订单状态:{$res}\n"; }else{ echo "[Task UpdateOrderStatus {$date}] 间隔时间调用\n"; } }); } //监听连接进入事件 public function onConnect($serv, $fd, $from_id) { $serv->send($fd, "Success {$fd}!"); } // 监听数据接收事件 public function onReceive(\Swoole\Server $serv, $fd, $from_id, $data) { echo "Get Message From Client {$fd}:{$data}\n"; $res['result'] = 'success'; $serv->send($fd, json_encode($res)); // 同步返回消息给客户端 $serv->task($data); // 执行异步任务 } /** * @param \Swoole\Server $serv * @param $task_id * @param $from_id * @param $data * @return false|string */ public function onTask(\Swoole\Server $serv, $task_id, $from_id, $data) { $date = date('Y-m-d H:i:s'); $taskName = isset($data['taskName']) ? $data['taskName'] : ''; switch ($taskName) { case 'UpdateOrderStatus': // 更新订单状态 // 时间限制 if(date('H:i') >= '00:00' && date('H:i') <= '05:00'){ echo "[Task {$taskName} {$date}] 不在运行时间段内\n"; return false; } // 调用处理 if($res = OrderService::make()->updateOrderStatus()){ $res = is_array($res) && $res? json_encode($res, 256) : 'success'; echo "[Task {$taskName} {$date}] 更新订单状态结果:{$res}\n"; }else{ $error = OrderService::make()->getError(); $error = $error? lang($error) : 'failed'; echo "[Task {$taskName} {$date}] 更新订单状态结果:{$error}\n"; } break; case 'UpdateOrderRefundStatus': // 更新售后订单状态 // 时间限制 if(date('H:i') >= '00:00' && date('H:i') <= '05:00'){ echo "[Task {$taskName} {$date}] 不在运行时间段内\n"; return false; } // 调用处理 if($res = OrderService::make()->updateOrderRefundStatus()){ $res = is_array($res) && $res? json_encode($res, 256) : 'success'; echo "[Task {$taskName} {$date}] 更新售后订单状态结果:{$res}\n"; }else{ $error = OrderService::make()->getError(); $error = $error? lang($error) : 'failed'; echo "[Task {$taskName} {$date}] 更新售后订单状态结果:{$error}\n"; } break; case 'UpdateGoods': // 更新商品 // 时间限制 if(date('H:i') >= '00:00' && date('H:i') <= '03:00'){ echo "[Task {$taskName} {$date}] 不在运行时间段内\n"; return false; } // 调用处理 if($res = GoodsService::make()->updateGoods()){ $res = is_array($res) && $res? json_encode($res, 256) : 'success'; echo "[Task {$taskName} {$date}] 商品数据获取更新结果:{$res}\n"; }else{ $error = GoodsService::make()->getError(); $error = $error? lang($error) : 'failed'; echo "[Task {$taskName} {$date}] 商品数据获取更新结果:{$error}\n"; } break; case 'UpdateGoodsSku': // 更新商品SKu数据 // 时间限制 if(date('H:i') >= '00:00' && date('H:i') <= '03:00'){ echo "[Task {$taskName} {$date}] 不在运行时间段内\n"; return false; } // 调用处理 if($res = GoodsService::make()->updateGoodsSku()){ $res = is_array($res) && $res? json_encode($res, 256) : 'success'; echo "[Task {$taskName} {$date}] 更新商品SKu数据结果:{$res}\n"; }else{ $error = GoodsService::make()->getError(); $error = $error? lang($error) : 'failed'; echo "[Task {$taskName} {$date}] 更新商品SKu数据结果:{$error}\n"; } break; case 'UpdateGoodsCategory': // 更新商品分类数据 // 时间限制 if(date('H:i') >= '00:00' && date('H:i') <= '04:00'){ echo "[Task {$taskName} {$date}] 不在运行时间段内\n"; return false; } // 调用处理 if($res = GoodsService::make()->updateGoodsCategory()){ $res = is_array($res) && $res? json_encode($res, 256) : 'success'; echo "[Task {$taskName} {$date}] 更新商品分类数据结果:{$res}\n"; }else{ $error = GoodsService::make()->getError(); $error = $error? lang($error) : 'failed'; echo "[Task {$taskName} {$date}] 更新商品分类数据结果:{$error}\n"; } break; case 'UpdateGoodsCategorySub': // 更新商品分类数据 // 时间限制 if(date('H:i') >= '00:00' && date('H:i') <= '04:00'){ echo "[Task {$taskName} {$date}] 不在运行时间段内\n"; return false; } // 调用处理 $pid = isset($data['pid'])? $data['pid'] : 0; if($res = GoodsService::make()->updateGoodsCategory($pid)){ $res = is_array($res) && $res? json_encode($res, 256) : 'success'; echo "[Task {$taskName} {$date}] 更新商品分类数据结果:{$res}\n"; }else{ $error = GoodsService::make()->getError(); $error = $error? lang($error) : 'failed'; echo "[Task {$taskName} {$date}] 更新商品分类数据结果:{$error}\n"; } break; case 'UpdateUsdtPrice': // 更新USDT价格 // 时间限制 if(date('H:i') >= '01:00' && date('H:i') <= '04:00'){ echo "[Task {$taskName} {$date}] 不在运行时间段内\n"; return false; } // 调用处理 if($res = WalletService::make()->getBianRatePrice(true)){ $res = is_array($res) && $res? json_encode($res, 256) : 'success'; echo "[Task {$taskName} {$date}] 更新USDT价格结果:{$res}\n"; }else{ $error = WalletService::make()->getError(); $error = $error? lang($error) : 'failed'; echo "[Task {$taskName} {$date}] 更新USDT价格结果:{$error}\n"; } break; case 'GrantScore': // 时间限制 if(date('H:i') >= '09:00'){ echo "[Task {$taskName} {$date}] 不在运行时间段内\n"; return false; } // 调用处理 if($res = FinanceService::make()->grantScore()){ $res = is_array($res) && $res? json_encode($res, 256) : 'success'; echo "[Task {$taskName} {$date}] 每日发放积分结果:{$res}\n"; }else{ $error = FinanceService::make()->getError(); $error = $error? lang($error) : 'failed'; echo "[Task {$taskName} {$date}] 每日发放积分结果:{$error}\n"; } break; case 'ReturnWaitScore': // 时间限制 if(date('H:i') >= '06:00'){ echo "[Task {$taskName} {$date}] 不在运行时间段内\n"; return false; } // 调用处理 if($res = FinanceService::make()->returnWaitScore()){ $res = is_array($res) && $res? json_encode($res, 256) : 'success'; echo "[Task {$taskName} {$date}] 待返积分每日返还结果:{$res}\n"; }else{ $error = FinanceService::make()->getError(); $error = $error? lang($error) : 'failed'; echo "[Task {$taskName} {$date}] 待返积分每日返还结果:{$error}\n"; } break; case 'GlobalBonus': // 全球分红 // 时间限制 if(date('H:i') >= '05:00'){ echo "[Task {$taskName} {$date}] 不在运行时间段内\n"; return false; } // 调用处理 if($res = FinanceService::make()->globalBonus()){ $res = is_array($res) && $res? json_encode($res, 256) : 'success'; echo "[Task {$taskName} {$date}] 全球分红结算结果:{$res}\n"; }else{ $error = FinanceService::make()->getError(); $error = $error? lang($error) : 'failed'; echo "[Task {$taskName} {$date}] 全球分红结算结果:{$error}\n"; } break; } return '暂无任务处理'; } /** * @param $serv swoole_server swoole_server对象 * @param $task_id int 任务id * @param $data string 任务返回的数据 */ public function onFinish(\Swoole\Server $serv, $task_id, $data) { // echo "任务处理完成...\n"; } // 监听连接关闭事件 public function onClose($serv, $fd, $from_id) { echo "Client {$fd} close connection\n"; $serv->close(); } }