123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326 |
- <?php
- namespace App\Console\Commands;
- use App\Services\RedisService;
- use Illuminate\Console\Command;
- use Illuminate\Support\Facades\DB;
- class SwooleTask extends Command
- {
- protected $serv;
- protected $host = '127.0.0.1';
- protected $port = 6622;
- // 进程名称
- protected $taskName = 'swooleTask';
- // PID路径
- protected $pidPath = '/storage/swoole.pid';
- // task
- protected $onlyReloadTaskWorker = false;
- // 设置运行时参数
- protected $options = [
- 'worker_num' => 8, //worker进程数,一般设置为CPU数的1-4倍
- 'daemonize' => true, //启用守护进程
- 'log_file' => '/storage/logs/swoole-task.log', //指定swoole错误日志文件
- 'log_level' => 0, //日志级别 范围是0-5,0-DEBUG,1-TRACE,2-INFO,3-NOTICE,4-WARNING,5-ERROR
- 'dispatch_mode' => 1, //数据包分发策略,1-轮询模式
- 'task_worker_num' => 6, //task进程的数量
- 'task_ipc_mode' => 3, //使用消息队列通信,并设置为争抢模式
- ];
- /**
- * The name and signature of the console command.
- *
- * @var string
- */
- protected $signature = 'swoole:task {op}';
- /**
- * The console command description.
- *
- * @var string
- */
- protected $description = 'Swoole task server description';
- /**
- * Create a new command instance.
- *
- * @return void
- */
- public function __construct()
- {
- parent::__construct();
- }
- /**
- * 入口
- * Execute the console command.
- *
- * @return mixed
- */
- public function handle()
- {
- ini_set("default_socket_timeout", -1);
- // 项目根目录
- defined('ROOT_PATH') or define('ROOT_PATH', base_path());
- // 文件上传目录
- defined('ATTACHMENT_PATH') or define('ATTACHMENT_PATH', base_path('public/uploads'));
- // 图片上传目录
- defined('IMG_PATH') or define('IMG_PATH', base_path('public/uploads/images'));
- // 临时存放目录
- defined('UPLOAD_TEMP_PATH') or define('UPLOAD_TEMP_PATH', ATTACHMENT_PATH . "/temp");
- // 定义普通图片域名
- defined('IMG_URL') or define('IMG_URL', env('IMG_URL'));
- // 数据表前缀
- defined('DB_PREFIX') or define('DB_PREFIX', DB::connection()->getTablePrefix());
- $this->options['log_file'] = base_path() . $this->options['log_file'];
- $this->pidPath = base_path() . $this->pidPath;
- $op = $this->argument('op');
- switch ($op) {
- case 'status': // 状态
- $res = $this->status();
- echo $res ? $res : 0;
- break;
- case 'start': // 运行
- return $this->start();
- break;
- case 'reload': // 平滑重启
- return $this->reload();
- break;
- case 'stop': // 停止运行
- return $this->stop();
- break;
- default:
- exit("{$op} command does not exist");
- break;
- }
- }
- /**
- * 启动
- */
- public function start()
- {
- date_default_timezone_set('PRC');
- // 构建Server对象,监听对应地址
- $this->serv = new \Swoole\Server($this->host, $this->port);
- $this->serv->set($this->options);
- // 注册事件
- $this->serv->on('start', [$this, 'onStart']);
- $this->serv->on('receive', [$this, 'onReceive']);
- $this->serv->on('task', [$this, 'onTask']);
- $this->serv->on('finish', [$this, 'onFinish']);
- // Run worker
- echo "swoole start...\n";
- $this->serv->start();
- }
- // 安全重启
- public function reload()
- {
- $pids = file_exists($this->pidPath) ? file_get_contents($this->pidPath) : '';
- $pids = $pids ? explode("\n", $pids) : [];
- $masterPid = isset($pids[0]) ? $pids[0] : '';
- $managePid = isset($pids[1]) ? $pids[1] : '';
- if (empty($masterPid)) {
- return false;
- }
- if (!$this->status($masterPid)) {
- return false;
- }
- \Swoole\Process::kill($managePid, SIGUSR1);
- echo "swoole reload...\n";
- }
- /**
- * 停止
- * @param bool $smooth
- * @return bool
- */
- public function stop($smooth = false)
- {
- $pids = file_exists($this->pidPath) ? file_get_contents($this->pidPath) : '';
- $pids = $pids ? explode("\n", $pids) : [];
- $masterPid = isset($pids[0]) ? $pids[0] : '';
- $managePid = isset($pids[1]) ? $pids[1] : '';
- if (empty($masterPid)) {
- return false;
- }
- if (!$this->status($masterPid)) {
- return false;
- }
- if ($smooth) {
- \Swoole\Process::kill($masterPid, SIGTERM);
- try {
- while (true) {
- \Swoole\Process::kill($masterPid, 0);
- }
- } catch (\Exception $exception) {
- }
- } else {
- \Swoole\Process::kill($masterPid, SIGKILL);
- }
- if($managePid){
- \Swoole\Process::kill($managePid, SIGKILL);
- }
- // 直接杀
- if(function_exists('exec')){
- exec('pkill -9 php artisan swoole:task start');
- }
- @unlink($this->pidPath);
- echo "swoole stop...\n";
- }
- /**
- * 状态
- * @return mixed
- */
- public function status($masterPid = 0)
- {
- $res = false;
- if (empty($masterPid) && file_exists($this->pidPath)) {
- $pids = file_get_contents($this->pidPath);
- $pids = $pids ? explode("\n", $pids) : [];
- $masterPid = isset($pids[0]) ? $pids[0] : '';
- }
- if ($masterPid) {
- $res = \Swoole\Process::kill($masterPid, 0);
- }
- return $res;
- }
- public function onStart($serv)
- {
- if (!is_dir(dirname($this->pidPath))) {
- @mkdir(dirname($this->pidPath), true, 755);
- }
- //记录进程id,脚本实现自动重启
- $pid = "{$serv->master_pid}\n{$serv->manager_pid}";
- file_put_contents($this->pidPath, $pid);
- // 定时任务
- $time = 0;
- $date = date('Y-m-d H:i:s');
- if(file_exists($this->options['log_file'])){
- $time = 0;
- file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n");
- }
- \swoole_timer_tick(300000, function ($timer) use ($serv, &$time) { // 启用定时器,每300秒执行一次
- $date = date('Y-m-d H:i:s');
- if($time>3600 && file_exists($this->options['log_file'])){
- $time = 0;
- file_put_contents($this->options['log_file'],"Task {$date}:清空日志\n");
- }
- $time++;
- if(!RedisService::get('caches:task:lock:music_loaded')){
- $taskData = [
- 'taskName' => 'bgmMusic',
- 'name' => "背景音乐写表处理",
- 'date' => date('Y-m-d'),
- ];
- $res = $serv->task($taskData);
- RedisService::set('caches:task:lock:music_loaded', true, rand(3,5));
- echo "[Task bgmMusic {$date}] 背景音乐写表处理:{$res}\n";
- }else{
- echo "[Task bgmMusic {$date}] 间隔时间调用\n";
- }
- });
- }
- //监听连接进入事件
- public function onConnect($serv, $fd, $from_id)
- {
- $serv->send($fd, "Success {$fd}!");
- }
- // 监听数据接收事件
- public function onReceive(\Swoole\Server $serv, $fd, $from_id, $data)
- {
- echo "Get Message From Client {$fd}:{$data}\n";
- $res['result'] = 'success';
- $serv->send($fd, json_encode($res)); // 同步返回消息给客户端
- $serv->task($data); // 执行异步任务
- }
- /**
- * @param \Swoole\Server $serv
- * @param $task_id
- * @param $from_id
- * @param $data
- * @return false|string
- */
- public function onTask(\Swoole\Server $serv, $task_id, $from_id, $data)
- {
- $date = date('Y-m-d H:i:s');
- $taskName = isset($data['taskName']) ? $data['taskName'] : '';
- switch ($taskName) {
- case 'OrderSettle': // 订单结算
- // 时间限制
- if(date('H:i') >= env('SETTLE_TIME','06:00')){
- echo "[Task {$taskName} {$date}] 不在运行时间段内\n";
- return false;
- }
- // 调用处理
- if($res = WalletService::make()->orderSettle()){
- $res = is_array($res) && $res? json_encode($res, 256) : 'success';
- echo "[Task {$taskName} {$date}] 订单结算结果:{$res}\n";
- }else{
- $error = BonusService::make()->getError();
- $error = $error? lang($error) : 'failed';
- echo "[Task {$taskName} {$date}] 订单结算结果:{$error}\n";
- }
- break;
- case 'bgmMusic':
- break;
- }
- return '暂无任务处理';
- }
- /**
- * @param $serv swoole_server swoole_server对象
- * @param $task_id int 任务id
- * @param $data string 任务返回的数据
- */
- public function onFinish(\Swoole\Server $serv, $task_id, $data)
- {
- //
- echo "任务处理完成...\n";
- }
- // 监听连接关闭事件
- public function onClose($serv, $fd, $from_id)
- {
- echo "Client {$fd} close connection\n";
- $serv->close();
- }
- }
|