| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- <?php
- namespace jobs;
- use interfaces\JobInterface;
- use think\Facade\Db;
- use think\facade\Log;
- use think\queue\Job;
- /**
- * 消息队列基类
- * Class BaseJob
- * @package crmeb\basic
- */
- class BaseJob implements JobInterface
- {
- /**
- * @param $name
- * @param $arguments
- */
- public function __call ($name, $arguments)
- {
- $this->fire(...$arguments);
- }
- /**
- * @param Job $job
- * @param $data
- */
- public function fire (Job $job, $data): void
- {
- try {
- $action = $data['do'] ?? 'doJob';//任务名
- $infoData = $data['data'] ?? [];//执行数据
- $errorCount = $data['errorCount'] ?? 0;//最大错误次数
- $log = $data['log'] ?? null;
- if (method_exists($this, $action)) {
- if ($this->{$action}(...$infoData)) {
- //删除任务
- $job->delete();
- //记录日志
- $this->info($log);
- } else {
- if ($job->attempts() >= $errorCount && $errorCount) {
- //删除任务
- $job->delete();
- //记录日志
- $this->info($log);
- //执行失败则记录日志
- $this->insertLog(...[$job->attempts(), $job->getRawBody()]);
- } else {
- //从新放入队列
- $job->release();
- }
- }
- } else {
- $job->delete();
- }
- } catch (\Throwable $e) {
- $job->delete();
- $this->insertLog(...[$job->attempts(), $job->getRawBody(), $e->getMessage()]);
- Log::error('执行消息队列发成错误,错误原因:' . $e->getMessage());
- }
- }
- /**
- * 打印出成功提示
- * @param $log
- */
- protected function info ($log)
- {
- try {
- if (is_callable($log)) {
- print_r($log() . "\r\n");
- } else if (is_string($log) || is_array($log)) {
- print_r($log . "\r\n");
- }
- } catch (\Throwable $e) {
- print_r($e->getMessage());
- }
- }
- /**
- * 任务失败执行方法
- * @param $data
- * @param $e
- */
- public function failed ($data, $e)
- {
- }
- /**
- * 执行失败记录日志
- * @param int $attempts
- * @param string $payload
- * @param string|null $exception
- */
- protected function insertLog (int $attempts, string $payload, ?string $exception = '')
- {
- Db::name('failed_jobs')->insert([
- 'attempts' => $attempts,
- 'connection' => env('queue.default'),
- 'queue' => env('queue.queue'),
- 'payload' => $payload,
- 'exception' => $exception,
- ]);
- }
- }
|