defaultDo = $this->do; } /** * @return static */ public static function instance() { if (is_null(self::$instance)) { self::$instance = new static(); } return self::$instance; } /** * 放入消息队列 * @param array|null $data * @return mixed */ public function push(?array $data = null) { if (!$this->job) { return $this->setError('需要执行的队列类必须存在'); } $res = QueueThink::{$this->action()}(...$this->getValues($data)); if(!$res){ $res = QueueThink::{$this->action()}(...$this->getValues($data)); if(!$res){ Log::error('加入队列失败,参数:'.json_encode($this->getValues($data))); } } $this->clean(); return $res; } /** * 清除数据 */ public function clean() { $this->secs = 0; $this->data = []; $this->log = null; $this->queueName = null; $this->errorCount = 3; $this->do = $this->defaultDo; } /** * 获取任务方式 * @return string */ protected function action() { return $this->secs ? 'later' : 'push'; } /** * 获取参数 * @param $data * @return array */ protected function getValues($data) { $jobData['data'] = $data ?: $this->data; $jobData['do'] = $this->do; $jobData['errorCount'] = $this->errorCount; $jobData['log'] = $this->log; if ($this->do != $this->defaultDo) { $this->job .= '@' . Config::get('queue.prefix', 'eb_') . $this->do; } if ($this->secs) { return [$this->secs, $this->job, $jobData, $this->queueName]; } else { return [$this->job, $jobData, $this->queueName]; } } /** * @param $name * @param $arguments * @return $this */ public function __call($name, $arguments) { if (in_array($name, $this->rules)) { if ($name === 'data') { $this->{$name} = $arguments; } else { $this->{$name} = $arguments[0] ?? null; } return $this; } else { throw new \RuntimeException('Method does not exist' . __CLASS__ . '->' . $name . '()'); } } }