BaseJob.php 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. <?php
  2. namespace jobs;
  3. use interfaces\JobInterface;
  4. use think\Facade\Db;
  5. use think\facade\Log;
  6. use think\queue\Job;
  7. /**
  8. * 消息队列基类
  9. * Class BaseJob
  10. * @package crmeb\basic
  11. */
  12. class BaseJob implements JobInterface
  13. {
  14. /**
  15. * @param $name
  16. * @param $arguments
  17. */
  18. public function __call ($name, $arguments)
  19. {
  20. $this->fire(...$arguments);
  21. }
  22. /**
  23. * @param Job $job
  24. * @param $data
  25. */
  26. public function fire (Job $job, $data): void
  27. {
  28. try {
  29. $action = $data['do'] ?? 'doJob';//任务名
  30. $infoData = $data['data'] ?? [];//执行数据
  31. $errorCount = $data['errorCount'] ?? 0;//最大错误次数
  32. $log = $data['log'] ?? null;
  33. if (method_exists($this, $action)) {
  34. if ($this->{$action}(...$infoData)) {
  35. //删除任务
  36. $job->delete();
  37. //记录日志
  38. $this->info($log);
  39. } else {
  40. if ($job->attempts() >= $errorCount && $errorCount) {
  41. //删除任务
  42. $job->delete();
  43. //记录日志
  44. $this->info($log);
  45. //执行失败则记录日志
  46. $this->insertLog(...[$job->attempts(), $job->getRawBody()]);
  47. } else {
  48. //从新放入队列
  49. $job->release();
  50. }
  51. }
  52. } else {
  53. $job->delete();
  54. }
  55. } catch (\Throwable $e) {
  56. $job->delete();
  57. $this->insertLog(...[$job->attempts(), $job->getRawBody(), $e->getMessage()]);
  58. Log::error('执行消息队列发成错误,错误原因:' . $e->getMessage());
  59. }
  60. }
  61. /**
  62. * 打印出成功提示
  63. * @param $log
  64. */
  65. protected function info ($log)
  66. {
  67. try {
  68. if (is_callable($log)) {
  69. print_r($log() . "\r\n");
  70. } else if (is_string($log) || is_array($log)) {
  71. print_r($log . "\r\n");
  72. }
  73. } catch (\Throwable $e) {
  74. print_r($e->getMessage());
  75. }
  76. }
  77. /**
  78. * 任务失败执行方法
  79. * @param $data
  80. * @param $e
  81. */
  82. public function failed ($data, $e)
  83. {
  84. }
  85. /**
  86. * 执行失败记录日志
  87. * @param int $attempts
  88. * @param string $payload
  89. * @param string|null $exception
  90. */
  91. protected function insertLog (int $attempts, string $payload, ?string $exception = '')
  92. {
  93. Db::name('failed_jobs')->insert([
  94. 'attempts' => $attempts,
  95. 'connection' => env('queue.default'),
  96. 'queue' => env('queue.queue'),
  97. 'payload' => $payload,
  98. 'exception' => $exception,
  99. ]);
  100. }
  101. }