QueueWorkerTest.php 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629
  1. <?php
  2. namespace Illuminate\Tests\Queue;
  3. use Exception;
  4. use Illuminate\Container\Container;
  5. use Illuminate\Contracts\Debug\ExceptionHandler;
  6. use Illuminate\Contracts\Events\Dispatcher;
  7. use Illuminate\Contracts\Queue\Job as QueueJobContract;
  8. use Illuminate\Queue\Events\JobExceptionOccurred;
  9. use Illuminate\Queue\Events\JobProcessed;
  10. use Illuminate\Queue\Events\JobProcessing;
  11. use Illuminate\Queue\MaxAttemptsExceededException;
  12. use Illuminate\Queue\QueueManager;
  13. use Illuminate\Queue\Worker;
  14. use Illuminate\Queue\WorkerOptions;
  15. use Illuminate\Support\Carbon;
  16. use Mockery as m;
  17. use PHPUnit\Framework\TestCase;
  18. use RuntimeException;
  19. class QueueWorkerTest extends TestCase
  20. {
  21. public $events;
  22. public $exceptionHandler;
  23. protected function setUp(): void
  24. {
  25. $this->events = m::spy(Dispatcher::class);
  26. $this->exceptionHandler = m::spy(ExceptionHandler::class);
  27. Container::setInstance($container = new Container);
  28. $container->instance(Dispatcher::class, $this->events);
  29. $container->instance(ExceptionHandler::class, $this->exceptionHandler);
  30. }
  31. protected function tearDown(): void
  32. {
  33. Container::setInstance(null);
  34. }
  35. public function testJobCanBeFired()
  36. {
  37. $worker = $this->getWorker('default', ['queue' => [$job = new WorkerFakeJob]]);
  38. $worker->runNextJob('default', 'queue', new WorkerOptions);
  39. $this->assertTrue($job->fired);
  40. $this->events->shouldHaveReceived('dispatch')->with(m::type(JobProcessing::class))->once();
  41. $this->events->shouldHaveReceived('dispatch')->with(m::type(JobProcessed::class))->once();
  42. }
  43. public function testWorkerCanWorkUntilQueueIsEmpty()
  44. {
  45. $workerOptions = new WorkerOptions;
  46. $workerOptions->stopWhenEmpty = true;
  47. $worker = $this->getWorker('default', ['queue' => [
  48. $firstJob = new WorkerFakeJob,
  49. $secondJob = new WorkerFakeJob,
  50. ]]);
  51. $status = $worker->daemon('default', 'queue', $workerOptions);
  52. $this->assertTrue($secondJob->fired);
  53. $this->assertSame(0, $status);
  54. $this->events->shouldHaveReceived('dispatch')->with(m::type(JobProcessing::class))->twice();
  55. $this->events->shouldHaveReceived('dispatch')->with(m::type(JobProcessed::class))->twice();
  56. }
  57. public function testWorkerStopsWhenMemoryExceeded()
  58. {
  59. $workerOptions = new WorkerOptions;
  60. $worker = $this->getWorker('default', ['queue' => [
  61. $firstJob = new WorkerFakeJob,
  62. $secondJob = new WorkerFakeJob,
  63. ]]);
  64. $worker->stopOnMemoryExceeded = true;
  65. $status = $worker->daemon('default', 'queue', $workerOptions);
  66. $this->assertTrue($firstJob->fired);
  67. $this->assertFalse($secondJob->fired);
  68. $this->assertSame(12, $status);
  69. $this->events->shouldHaveReceived('dispatch')->with(m::type(JobProcessing::class))->once();
  70. $this->events->shouldHaveReceived('dispatch')->with(m::type(JobProcessed::class))->once();
  71. }
  72. public function testJobCanBeFiredBasedOnPriority()
  73. {
  74. $worker = $this->getWorker('default', [
  75. 'high' => [$highJob = new WorkerFakeJob, $secondHighJob = new WorkerFakeJob], 'low' => [$lowJob = new WorkerFakeJob],
  76. ]);
  77. $worker->runNextJob('default', 'high,low', new WorkerOptions);
  78. $this->assertTrue($highJob->fired);
  79. $this->assertFalse($secondHighJob->fired);
  80. $this->assertFalse($lowJob->fired);
  81. $worker->runNextJob('default', 'high,low', new WorkerOptions);
  82. $this->assertTrue($secondHighJob->fired);
  83. $this->assertFalse($lowJob->fired);
  84. $worker->runNextJob('default', 'high,low', new WorkerOptions);
  85. $this->assertTrue($lowJob->fired);
  86. }
  87. public function testExceptionIsReportedIfConnectionThrowsExceptionOnJobPop()
  88. {
  89. $worker = new InsomniacWorker(
  90. new WorkerFakeManager('default', new BrokenQueueConnection($e = new RuntimeException)),
  91. $this->events,
  92. $this->exceptionHandler,
  93. function () {
  94. return false;
  95. }
  96. );
  97. $worker->runNextJob('default', 'queue', $this->workerOptions());
  98. $this->exceptionHandler->shouldHaveReceived('report')->with($e);
  99. }
  100. public function testWorkerSleepsWhenQueueIsEmpty()
  101. {
  102. $worker = $this->getWorker('default', ['queue' => []]);
  103. $worker->runNextJob('default', 'queue', $this->workerOptions(['sleep' => 5]));
  104. $this->assertEquals(5, $worker->sleptFor);
  105. }
  106. public function testJobIsReleasedOnException()
  107. {
  108. $e = new RuntimeException;
  109. $job = new WorkerFakeJob(function () use ($e) {
  110. throw $e;
  111. });
  112. $worker = $this->getWorker('default', ['queue' => [$job]]);
  113. $worker->runNextJob('default', 'queue', $this->workerOptions(['backoff' => 10]));
  114. $this->assertEquals(10, $job->releaseAfter);
  115. $this->assertFalse($job->deleted);
  116. $this->exceptionHandler->shouldHaveReceived('report')->with($e);
  117. $this->events->shouldHaveReceived('dispatch')->with(m::type(JobExceptionOccurred::class))->once();
  118. $this->events->shouldNotHaveReceived('dispatch', [m::type(JobProcessed::class)]);
  119. }
  120. public function testJobIsNotReleasedIfItHasExceededMaxAttempts()
  121. {
  122. $e = new RuntimeException;
  123. $job = new WorkerFakeJob(function ($job) use ($e) {
  124. // In normal use this would be incremented by being popped off the queue
  125. $job->attempts++;
  126. throw $e;
  127. });
  128. $job->attempts = 1;
  129. $worker = $this->getWorker('default', ['queue' => [$job]]);
  130. $worker->runNextJob('default', 'queue', $this->workerOptions(['maxTries' => 1]));
  131. $this->assertNull($job->releaseAfter);
  132. $this->assertTrue($job->deleted);
  133. $this->assertEquals($e, $job->failedWith);
  134. $this->exceptionHandler->shouldHaveReceived('report')->with($e);
  135. $this->events->shouldHaveReceived('dispatch')->with(m::type(JobExceptionOccurred::class))->once();
  136. $this->events->shouldNotHaveReceived('dispatch', [m::type(JobProcessed::class)]);
  137. }
  138. public function testJobIsNotReleasedIfItHasExpired()
  139. {
  140. $e = new RuntimeException;
  141. $job = new WorkerFakeJob(function ($job) use ($e) {
  142. // In normal use this would be incremented by being popped off the queue
  143. $job->attempts++;
  144. throw $e;
  145. });
  146. $job->retryUntil = now()->addSeconds(1)->getTimestamp();
  147. $job->attempts = 0;
  148. Carbon::setTestNow(
  149. Carbon::now()->addSeconds(1)
  150. );
  151. $worker = $this->getWorker('default', ['queue' => [$job]]);
  152. $worker->runNextJob('default', 'queue', $this->workerOptions());
  153. $this->assertNull($job->releaseAfter);
  154. $this->assertTrue($job->deleted);
  155. $this->assertEquals($e, $job->failedWith);
  156. $this->exceptionHandler->shouldHaveReceived('report')->with($e);
  157. $this->events->shouldHaveReceived('dispatch')->with(m::type(JobExceptionOccurred::class))->once();
  158. $this->events->shouldNotHaveReceived('dispatch', [m::type(JobProcessed::class)]);
  159. }
  160. public function testJobIsFailedIfItHasAlreadyExceededMaxAttempts()
  161. {
  162. $job = new WorkerFakeJob(function ($job) {
  163. $job->attempts++;
  164. });
  165. $job->attempts = 2;
  166. $worker = $this->getWorker('default', ['queue' => [$job]]);
  167. $worker->runNextJob('default', 'queue', $this->workerOptions(['maxTries' => 1]));
  168. $this->assertNull($job->releaseAfter);
  169. $this->assertTrue($job->deleted);
  170. $this->assertInstanceOf(MaxAttemptsExceededException::class, $job->failedWith);
  171. $this->exceptionHandler->shouldHaveReceived('report')->with(m::type(MaxAttemptsExceededException::class));
  172. $this->events->shouldHaveReceived('dispatch')->with(m::type(JobExceptionOccurred::class))->once();
  173. $this->events->shouldNotHaveReceived('dispatch', [m::type(JobProcessed::class)]);
  174. }
  175. public function testJobIsFailedIfItHasAlreadyExpired()
  176. {
  177. $job = new WorkerFakeJob(function ($job) {
  178. $job->attempts++;
  179. });
  180. $job->retryUntil = Carbon::now()->addSeconds(2)->getTimestamp();
  181. $job->attempts = 1;
  182. Carbon::setTestNow(
  183. Carbon::now()->addSeconds(3)
  184. );
  185. $worker = $this->getWorker('default', ['queue' => [$job]]);
  186. $worker->runNextJob('default', 'queue', $this->workerOptions());
  187. $this->assertNull($job->releaseAfter);
  188. $this->assertTrue($job->deleted);
  189. $this->assertInstanceOf(MaxAttemptsExceededException::class, $job->failedWith);
  190. $this->exceptionHandler->shouldHaveReceived('report')->with(m::type(MaxAttemptsExceededException::class));
  191. $this->events->shouldHaveReceived('dispatch')->with(m::type(JobExceptionOccurred::class))->once();
  192. $this->events->shouldNotHaveReceived('dispatch', [m::type(JobProcessed::class)]);
  193. }
  194. public function testJobBasedMaxRetries()
  195. {
  196. $job = new WorkerFakeJob(function ($job) {
  197. $job->attempts++;
  198. });
  199. $job->attempts = 2;
  200. $job->maxTries = 10;
  201. $worker = $this->getWorker('default', ['queue' => [$job]]);
  202. $worker->runNextJob('default', 'queue', $this->workerOptions(['maxTries' => 1]));
  203. $this->assertFalse($job->deleted);
  204. $this->assertNull($job->failedWith);
  205. }
  206. public function testJobBasedFailedDelay()
  207. {
  208. $job = new WorkerFakeJob(function ($job) {
  209. throw new Exception('Something went wrong.');
  210. });
  211. $job->attempts = 1;
  212. $job->backoff = 10;
  213. $worker = $this->getWorker('default', ['queue' => [$job]]);
  214. $worker->runNextJob('default', 'queue', $this->workerOptions(['backoff' => 3, 'maxTries' => 0]));
  215. $this->assertEquals(10, $job->releaseAfter);
  216. }
  217. public function testJobRunsIfAppIsNotInMaintenanceMode()
  218. {
  219. $firstJob = new WorkerFakeJob(function ($job) {
  220. $job->attempts++;
  221. });
  222. $secondJob = new WorkerFakeJob(function ($job) {
  223. $job->attempts++;
  224. });
  225. $this->maintenanceFlags = [false, true];
  226. $maintenanceModeChecker = function () {
  227. if ($this->maintenanceFlags) {
  228. return array_shift($this->maintenanceFlags);
  229. }
  230. throw new LoopBreakerException;
  231. };
  232. $worker = $this->getWorker('default', ['queue' => [$firstJob, $secondJob]], $maintenanceModeChecker);
  233. try {
  234. $worker->daemon('default', 'queue', $this->workerOptions());
  235. $this->fail('Expected LoopBreakerException to be thrown');
  236. } catch (LoopBreakerException $e) {
  237. $this->assertSame(1, $firstJob->attempts);
  238. $this->assertSame(0, $secondJob->attempts);
  239. }
  240. }
  241. public function testJobDoesNotFireIfDeleted()
  242. {
  243. $job = new WorkerFakeJob(function () {
  244. return true;
  245. });
  246. $worker = $this->getWorker('default', ['queue' => [$job]]);
  247. $job->delete();
  248. $worker->runNextJob('default', 'queue', $this->workerOptions());
  249. $this->events->shouldHaveReceived('dispatch')->with(m::type(JobProcessed::class))->once();
  250. $this->assertFalse($job->hasFailed());
  251. $this->assertFalse($job->isReleased());
  252. $this->assertTrue($job->isDeleted());
  253. }
  254. public function testWorkerPicksJobUsingCustomCallbacks()
  255. {
  256. $worker = $this->getWorker('default', [
  257. 'default' => [$defaultJob = new WorkerFakeJob], 'custom' => [$customJob = new WorkerFakeJob],
  258. ]);
  259. $worker->runNextJob('default', 'default', new WorkerOptions);
  260. $worker->runNextJob('default', 'default', new WorkerOptions);
  261. $this->assertTrue($defaultJob->fired);
  262. $this->assertFalse($customJob->fired);
  263. $worker2 = $this->getWorker('default', [
  264. 'default' => [$defaultJob = new WorkerFakeJob], 'custom' => [$customJob = new WorkerFakeJob],
  265. ]);
  266. $worker2->setName('myworker');
  267. Worker::popUsing('myworker', function ($pop) {
  268. return $pop('custom');
  269. });
  270. $worker2->runNextJob('default', 'default', new WorkerOptions);
  271. $worker2->runNextJob('default', 'default', new WorkerOptions);
  272. $this->assertFalse($defaultJob->fired);
  273. $this->assertTrue($customJob->fired);
  274. Worker::popUsing('myworker', null);
  275. }
  276. /**
  277. * Helpers...
  278. */
  279. private function getWorker($connectionName = 'default', $jobs = [], ?callable $isInMaintenanceMode = null)
  280. {
  281. return new InsomniacWorker(
  282. ...$this->workerDependencies($connectionName, $jobs, $isInMaintenanceMode)
  283. );
  284. }
  285. private function workerDependencies($connectionName = 'default', $jobs = [], ?callable $isInMaintenanceMode = null)
  286. {
  287. return [
  288. new WorkerFakeManager($connectionName, new WorkerFakeConnection($jobs)),
  289. $this->events,
  290. $this->exceptionHandler,
  291. $isInMaintenanceMode ?? function () {
  292. return false;
  293. },
  294. ];
  295. }
  296. private function workerOptions(array $overrides = [])
  297. {
  298. $options = new WorkerOptions;
  299. foreach ($overrides as $key => $value) {
  300. $options->{$key} = $value;
  301. }
  302. return $options;
  303. }
  304. }
  305. /**
  306. * Fakes.
  307. */
  308. class InsomniacWorker extends Worker
  309. {
  310. public $sleptFor;
  311. public $stopOnMemoryExceeded = false;
  312. public function sleep($seconds)
  313. {
  314. $this->sleptFor = $seconds;
  315. }
  316. public function stop($status = 0)
  317. {
  318. return $status;
  319. }
  320. public function daemonShouldRun(WorkerOptions $options, $connectionName, $queue)
  321. {
  322. return ! ($this->isDownForMaintenance)();
  323. }
  324. public function memoryExceeded($memoryLimit)
  325. {
  326. return $this->stopOnMemoryExceeded;
  327. }
  328. }
  329. class WorkerFakeManager extends QueueManager
  330. {
  331. public $connections = [];
  332. public function __construct($name, $connection)
  333. {
  334. $this->connections[$name] = $connection;
  335. }
  336. public function connection($name = null)
  337. {
  338. return $this->connections[$name];
  339. }
  340. }
  341. class WorkerFakeConnection
  342. {
  343. public $jobs = [];
  344. public function __construct($jobs)
  345. {
  346. $this->jobs = $jobs;
  347. }
  348. public function pop($queue)
  349. {
  350. return array_shift($this->jobs[$queue]);
  351. }
  352. }
  353. class BrokenQueueConnection
  354. {
  355. public $exception;
  356. public function __construct($exception)
  357. {
  358. $this->exception = $exception;
  359. }
  360. public function pop($queue)
  361. {
  362. throw $this->exception;
  363. }
  364. }
  365. class WorkerFakeJob implements QueueJobContract
  366. {
  367. public $id = '';
  368. public $fired = false;
  369. public $callback;
  370. public $deleted = false;
  371. public $releaseAfter;
  372. public $released = false;
  373. public $maxTries;
  374. public $maxExceptions;
  375. public $shouldFailOnTimeout = false;
  376. public $uuid;
  377. public $backoff;
  378. public $retryUntil;
  379. public $attempts = 0;
  380. public $failedWith;
  381. public $failed = false;
  382. public $connectionName = '';
  383. public $queue = '';
  384. public $rawBody = '';
  385. public function __construct($callback = null)
  386. {
  387. $this->callback = $callback ?: function () {
  388. //
  389. };
  390. }
  391. public function getJobId()
  392. {
  393. return $this->id;
  394. }
  395. public function fire()
  396. {
  397. $this->fired = true;
  398. $this->callback->__invoke($this);
  399. }
  400. public function payload()
  401. {
  402. return [];
  403. }
  404. public function maxTries()
  405. {
  406. return $this->maxTries;
  407. }
  408. public function maxExceptions()
  409. {
  410. return $this->maxExceptions;
  411. }
  412. public function shouldFailOnTimeout()
  413. {
  414. return $this->shouldFailOnTimeout;
  415. }
  416. public function uuid()
  417. {
  418. return $this->uuid;
  419. }
  420. public function backoff()
  421. {
  422. return $this->backoff;
  423. }
  424. public function retryUntil()
  425. {
  426. return $this->retryUntil;
  427. }
  428. public function delete()
  429. {
  430. $this->deleted = true;
  431. }
  432. public function isDeleted()
  433. {
  434. return $this->deleted;
  435. }
  436. public function release($delay = 0)
  437. {
  438. $this->released = true;
  439. $this->releaseAfter = $delay;
  440. }
  441. public function isReleased()
  442. {
  443. return $this->released;
  444. }
  445. public function isDeletedOrReleased()
  446. {
  447. return $this->deleted || $this->released;
  448. }
  449. public function attempts()
  450. {
  451. return $this->attempts;
  452. }
  453. public function markAsFailed()
  454. {
  455. $this->failed = true;
  456. }
  457. public function fail($e = null)
  458. {
  459. $this->markAsFailed();
  460. $this->delete();
  461. $this->failedWith = $e;
  462. }
  463. public function hasFailed()
  464. {
  465. return $this->failed;
  466. }
  467. public function getName()
  468. {
  469. return 'WorkerFakeJob';
  470. }
  471. public function resolveName()
  472. {
  473. return $this->getName();
  474. }
  475. public function getConnectionName()
  476. {
  477. return $this->connectionName;
  478. }
  479. public function getQueue()
  480. {
  481. return $this->queue;
  482. }
  483. public function getRawBody()
  484. {
  485. return $this->rawBody;
  486. }
  487. public function timeout()
  488. {
  489. return time() + 60;
  490. }
  491. }
  492. class LoopBreakerException extends RuntimeException
  493. {
  494. //
  495. }