123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629 |
- <?php
- namespace Illuminate\Tests\Queue;
- use Exception;
- use Illuminate\Container\Container;
- use Illuminate\Contracts\Debug\ExceptionHandler;
- use Illuminate\Contracts\Events\Dispatcher;
- use Illuminate\Contracts\Queue\Job as QueueJobContract;
- use Illuminate\Queue\Events\JobExceptionOccurred;
- use Illuminate\Queue\Events\JobProcessed;
- use Illuminate\Queue\Events\JobProcessing;
- use Illuminate\Queue\MaxAttemptsExceededException;
- use Illuminate\Queue\QueueManager;
- use Illuminate\Queue\Worker;
- use Illuminate\Queue\WorkerOptions;
- use Illuminate\Support\Carbon;
- use Mockery as m;
- use PHPUnit\Framework\TestCase;
- use RuntimeException;
- class QueueWorkerTest extends TestCase
- {
- public $events;
- public $exceptionHandler;
- protected function setUp(): void
- {
- $this->events = m::spy(Dispatcher::class);
- $this->exceptionHandler = m::spy(ExceptionHandler::class);
- Container::setInstance($container = new Container);
- $container->instance(Dispatcher::class, $this->events);
- $container->instance(ExceptionHandler::class, $this->exceptionHandler);
- }
- protected function tearDown(): void
- {
- Container::setInstance(null);
- }
- public function testJobCanBeFired()
- {
- $worker = $this->getWorker('default', ['queue' => [$job = new WorkerFakeJob]]);
- $worker->runNextJob('default', 'queue', new WorkerOptions);
- $this->assertTrue($job->fired);
- $this->events->shouldHaveReceived('dispatch')->with(m::type(JobProcessing::class))->once();
- $this->events->shouldHaveReceived('dispatch')->with(m::type(JobProcessed::class))->once();
- }
- public function testWorkerCanWorkUntilQueueIsEmpty()
- {
- $workerOptions = new WorkerOptions;
- $workerOptions->stopWhenEmpty = true;
- $worker = $this->getWorker('default', ['queue' => [
- $firstJob = new WorkerFakeJob,
- $secondJob = new WorkerFakeJob,
- ]]);
- $status = $worker->daemon('default', 'queue', $workerOptions);
- $this->assertTrue($secondJob->fired);
- $this->assertSame(0, $status);
- $this->events->shouldHaveReceived('dispatch')->with(m::type(JobProcessing::class))->twice();
- $this->events->shouldHaveReceived('dispatch')->with(m::type(JobProcessed::class))->twice();
- }
- public function testWorkerStopsWhenMemoryExceeded()
- {
- $workerOptions = new WorkerOptions;
- $worker = $this->getWorker('default', ['queue' => [
- $firstJob = new WorkerFakeJob,
- $secondJob = new WorkerFakeJob,
- ]]);
- $worker->stopOnMemoryExceeded = true;
- $status = $worker->daemon('default', 'queue', $workerOptions);
- $this->assertTrue($firstJob->fired);
- $this->assertFalse($secondJob->fired);
- $this->assertSame(12, $status);
- $this->events->shouldHaveReceived('dispatch')->with(m::type(JobProcessing::class))->once();
- $this->events->shouldHaveReceived('dispatch')->with(m::type(JobProcessed::class))->once();
- }
- public function testJobCanBeFiredBasedOnPriority()
- {
- $worker = $this->getWorker('default', [
- 'high' => [$highJob = new WorkerFakeJob, $secondHighJob = new WorkerFakeJob], 'low' => [$lowJob = new WorkerFakeJob],
- ]);
- $worker->runNextJob('default', 'high,low', new WorkerOptions);
- $this->assertTrue($highJob->fired);
- $this->assertFalse($secondHighJob->fired);
- $this->assertFalse($lowJob->fired);
- $worker->runNextJob('default', 'high,low', new WorkerOptions);
- $this->assertTrue($secondHighJob->fired);
- $this->assertFalse($lowJob->fired);
- $worker->runNextJob('default', 'high,low', new WorkerOptions);
- $this->assertTrue($lowJob->fired);
- }
- public function testExceptionIsReportedIfConnectionThrowsExceptionOnJobPop()
- {
- $worker = new InsomniacWorker(
- new WorkerFakeManager('default', new BrokenQueueConnection($e = new RuntimeException)),
- $this->events,
- $this->exceptionHandler,
- function () {
- return false;
- }
- );
- $worker->runNextJob('default', 'queue', $this->workerOptions());
- $this->exceptionHandler->shouldHaveReceived('report')->with($e);
- }
- public function testWorkerSleepsWhenQueueIsEmpty()
- {
- $worker = $this->getWorker('default', ['queue' => []]);
- $worker->runNextJob('default', 'queue', $this->workerOptions(['sleep' => 5]));
- $this->assertEquals(5, $worker->sleptFor);
- }
- public function testJobIsReleasedOnException()
- {
- $e = new RuntimeException;
- $job = new WorkerFakeJob(function () use ($e) {
- throw $e;
- });
- $worker = $this->getWorker('default', ['queue' => [$job]]);
- $worker->runNextJob('default', 'queue', $this->workerOptions(['backoff' => 10]));
- $this->assertEquals(10, $job->releaseAfter);
- $this->assertFalse($job->deleted);
- $this->exceptionHandler->shouldHaveReceived('report')->with($e);
- $this->events->shouldHaveReceived('dispatch')->with(m::type(JobExceptionOccurred::class))->once();
- $this->events->shouldNotHaveReceived('dispatch', [m::type(JobProcessed::class)]);
- }
- public function testJobIsNotReleasedIfItHasExceededMaxAttempts()
- {
- $e = new RuntimeException;
- $job = new WorkerFakeJob(function ($job) use ($e) {
- // In normal use this would be incremented by being popped off the queue
- $job->attempts++;
- throw $e;
- });
- $job->attempts = 1;
- $worker = $this->getWorker('default', ['queue' => [$job]]);
- $worker->runNextJob('default', 'queue', $this->workerOptions(['maxTries' => 1]));
- $this->assertNull($job->releaseAfter);
- $this->assertTrue($job->deleted);
- $this->assertEquals($e, $job->failedWith);
- $this->exceptionHandler->shouldHaveReceived('report')->with($e);
- $this->events->shouldHaveReceived('dispatch')->with(m::type(JobExceptionOccurred::class))->once();
- $this->events->shouldNotHaveReceived('dispatch', [m::type(JobProcessed::class)]);
- }
- public function testJobIsNotReleasedIfItHasExpired()
- {
- $e = new RuntimeException;
- $job = new WorkerFakeJob(function ($job) use ($e) {
- // In normal use this would be incremented by being popped off the queue
- $job->attempts++;
- throw $e;
- });
- $job->retryUntil = now()->addSeconds(1)->getTimestamp();
- $job->attempts = 0;
- Carbon::setTestNow(
- Carbon::now()->addSeconds(1)
- );
- $worker = $this->getWorker('default', ['queue' => [$job]]);
- $worker->runNextJob('default', 'queue', $this->workerOptions());
- $this->assertNull($job->releaseAfter);
- $this->assertTrue($job->deleted);
- $this->assertEquals($e, $job->failedWith);
- $this->exceptionHandler->shouldHaveReceived('report')->with($e);
- $this->events->shouldHaveReceived('dispatch')->with(m::type(JobExceptionOccurred::class))->once();
- $this->events->shouldNotHaveReceived('dispatch', [m::type(JobProcessed::class)]);
- }
- public function testJobIsFailedIfItHasAlreadyExceededMaxAttempts()
- {
- $job = new WorkerFakeJob(function ($job) {
- $job->attempts++;
- });
- $job->attempts = 2;
- $worker = $this->getWorker('default', ['queue' => [$job]]);
- $worker->runNextJob('default', 'queue', $this->workerOptions(['maxTries' => 1]));
- $this->assertNull($job->releaseAfter);
- $this->assertTrue($job->deleted);
- $this->assertInstanceOf(MaxAttemptsExceededException::class, $job->failedWith);
- $this->exceptionHandler->shouldHaveReceived('report')->with(m::type(MaxAttemptsExceededException::class));
- $this->events->shouldHaveReceived('dispatch')->with(m::type(JobExceptionOccurred::class))->once();
- $this->events->shouldNotHaveReceived('dispatch', [m::type(JobProcessed::class)]);
- }
- public function testJobIsFailedIfItHasAlreadyExpired()
- {
- $job = new WorkerFakeJob(function ($job) {
- $job->attempts++;
- });
- $job->retryUntil = Carbon::now()->addSeconds(2)->getTimestamp();
- $job->attempts = 1;
- Carbon::setTestNow(
- Carbon::now()->addSeconds(3)
- );
- $worker = $this->getWorker('default', ['queue' => [$job]]);
- $worker->runNextJob('default', 'queue', $this->workerOptions());
- $this->assertNull($job->releaseAfter);
- $this->assertTrue($job->deleted);
- $this->assertInstanceOf(MaxAttemptsExceededException::class, $job->failedWith);
- $this->exceptionHandler->shouldHaveReceived('report')->with(m::type(MaxAttemptsExceededException::class));
- $this->events->shouldHaveReceived('dispatch')->with(m::type(JobExceptionOccurred::class))->once();
- $this->events->shouldNotHaveReceived('dispatch', [m::type(JobProcessed::class)]);
- }
- public function testJobBasedMaxRetries()
- {
- $job = new WorkerFakeJob(function ($job) {
- $job->attempts++;
- });
- $job->attempts = 2;
- $job->maxTries = 10;
- $worker = $this->getWorker('default', ['queue' => [$job]]);
- $worker->runNextJob('default', 'queue', $this->workerOptions(['maxTries' => 1]));
- $this->assertFalse($job->deleted);
- $this->assertNull($job->failedWith);
- }
- public function testJobBasedFailedDelay()
- {
- $job = new WorkerFakeJob(function ($job) {
- throw new Exception('Something went wrong.');
- });
- $job->attempts = 1;
- $job->backoff = 10;
- $worker = $this->getWorker('default', ['queue' => [$job]]);
- $worker->runNextJob('default', 'queue', $this->workerOptions(['backoff' => 3, 'maxTries' => 0]));
- $this->assertEquals(10, $job->releaseAfter);
- }
- public function testJobRunsIfAppIsNotInMaintenanceMode()
- {
- $firstJob = new WorkerFakeJob(function ($job) {
- $job->attempts++;
- });
- $secondJob = new WorkerFakeJob(function ($job) {
- $job->attempts++;
- });
- $this->maintenanceFlags = [false, true];
- $maintenanceModeChecker = function () {
- if ($this->maintenanceFlags) {
- return array_shift($this->maintenanceFlags);
- }
- throw new LoopBreakerException;
- };
- $worker = $this->getWorker('default', ['queue' => [$firstJob, $secondJob]], $maintenanceModeChecker);
- try {
- $worker->daemon('default', 'queue', $this->workerOptions());
- $this->fail('Expected LoopBreakerException to be thrown');
- } catch (LoopBreakerException $e) {
- $this->assertSame(1, $firstJob->attempts);
- $this->assertSame(0, $secondJob->attempts);
- }
- }
- public function testJobDoesNotFireIfDeleted()
- {
- $job = new WorkerFakeJob(function () {
- return true;
- });
- $worker = $this->getWorker('default', ['queue' => [$job]]);
- $job->delete();
- $worker->runNextJob('default', 'queue', $this->workerOptions());
- $this->events->shouldHaveReceived('dispatch')->with(m::type(JobProcessed::class))->once();
- $this->assertFalse($job->hasFailed());
- $this->assertFalse($job->isReleased());
- $this->assertTrue($job->isDeleted());
- }
- public function testWorkerPicksJobUsingCustomCallbacks()
- {
- $worker = $this->getWorker('default', [
- 'default' => [$defaultJob = new WorkerFakeJob], 'custom' => [$customJob = new WorkerFakeJob],
- ]);
- $worker->runNextJob('default', 'default', new WorkerOptions);
- $worker->runNextJob('default', 'default', new WorkerOptions);
- $this->assertTrue($defaultJob->fired);
- $this->assertFalse($customJob->fired);
- $worker2 = $this->getWorker('default', [
- 'default' => [$defaultJob = new WorkerFakeJob], 'custom' => [$customJob = new WorkerFakeJob],
- ]);
- $worker2->setName('myworker');
- Worker::popUsing('myworker', function ($pop) {
- return $pop('custom');
- });
- $worker2->runNextJob('default', 'default', new WorkerOptions);
- $worker2->runNextJob('default', 'default', new WorkerOptions);
- $this->assertFalse($defaultJob->fired);
- $this->assertTrue($customJob->fired);
- Worker::popUsing('myworker', null);
- }
- /**
- * Helpers...
- */
- private function getWorker($connectionName = 'default', $jobs = [], ?callable $isInMaintenanceMode = null)
- {
- return new InsomniacWorker(
- ...$this->workerDependencies($connectionName, $jobs, $isInMaintenanceMode)
- );
- }
- private function workerDependencies($connectionName = 'default', $jobs = [], ?callable $isInMaintenanceMode = null)
- {
- return [
- new WorkerFakeManager($connectionName, new WorkerFakeConnection($jobs)),
- $this->events,
- $this->exceptionHandler,
- $isInMaintenanceMode ?? function () {
- return false;
- },
- ];
- }
- private function workerOptions(array $overrides = [])
- {
- $options = new WorkerOptions;
- foreach ($overrides as $key => $value) {
- $options->{$key} = $value;
- }
- return $options;
- }
- }
- /**
- * Fakes.
- */
- class InsomniacWorker extends Worker
- {
- public $sleptFor;
- public $stopOnMemoryExceeded = false;
- public function sleep($seconds)
- {
- $this->sleptFor = $seconds;
- }
- public function stop($status = 0)
- {
- return $status;
- }
- public function daemonShouldRun(WorkerOptions $options, $connectionName, $queue)
- {
- return ! ($this->isDownForMaintenance)();
- }
- public function memoryExceeded($memoryLimit)
- {
- return $this->stopOnMemoryExceeded;
- }
- }
- class WorkerFakeManager extends QueueManager
- {
- public $connections = [];
- public function __construct($name, $connection)
- {
- $this->connections[$name] = $connection;
- }
- public function connection($name = null)
- {
- return $this->connections[$name];
- }
- }
- class WorkerFakeConnection
- {
- public $jobs = [];
- public function __construct($jobs)
- {
- $this->jobs = $jobs;
- }
- public function pop($queue)
- {
- return array_shift($this->jobs[$queue]);
- }
- }
- class BrokenQueueConnection
- {
- public $exception;
- public function __construct($exception)
- {
- $this->exception = $exception;
- }
- public function pop($queue)
- {
- throw $this->exception;
- }
- }
- class WorkerFakeJob implements QueueJobContract
- {
- public $id = '';
- public $fired = false;
- public $callback;
- public $deleted = false;
- public $releaseAfter;
- public $released = false;
- public $maxTries;
- public $maxExceptions;
- public $shouldFailOnTimeout = false;
- public $uuid;
- public $backoff;
- public $retryUntil;
- public $attempts = 0;
- public $failedWith;
- public $failed = false;
- public $connectionName = '';
- public $queue = '';
- public $rawBody = '';
- public function __construct($callback = null)
- {
- $this->callback = $callback ?: function () {
- //
- };
- }
- public function getJobId()
- {
- return $this->id;
- }
- public function fire()
- {
- $this->fired = true;
- $this->callback->__invoke($this);
- }
- public function payload()
- {
- return [];
- }
- public function maxTries()
- {
- return $this->maxTries;
- }
- public function maxExceptions()
- {
- return $this->maxExceptions;
- }
- public function shouldFailOnTimeout()
- {
- return $this->shouldFailOnTimeout;
- }
- public function uuid()
- {
- return $this->uuid;
- }
- public function backoff()
- {
- return $this->backoff;
- }
- public function retryUntil()
- {
- return $this->retryUntil;
- }
- public function delete()
- {
- $this->deleted = true;
- }
- public function isDeleted()
- {
- return $this->deleted;
- }
- public function release($delay = 0)
- {
- $this->released = true;
- $this->releaseAfter = $delay;
- }
- public function isReleased()
- {
- return $this->released;
- }
- public function isDeletedOrReleased()
- {
- return $this->deleted || $this->released;
- }
- public function attempts()
- {
- return $this->attempts;
- }
- public function markAsFailed()
- {
- $this->failed = true;
- }
- public function fail($e = null)
- {
- $this->markAsFailed();
- $this->delete();
- $this->failedWith = $e;
- }
- public function hasFailed()
- {
- return $this->failed;
- }
- public function getName()
- {
- return 'WorkerFakeJob';
- }
- public function resolveName()
- {
- return $this->getName();
- }
- public function getConnectionName()
- {
- return $this->connectionName;
- }
- public function getQueue()
- {
- return $this->queue;
- }
- public function getRawBody()
- {
- return $this->rawBody;
- }
- public function timeout()
- {
- return time() + 60;
- }
- }
- class LoopBreakerException extends RuntimeException
- {
- //
- }
|