RedisQueueIntegrationTest.php 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543
  1. <?php
  2. namespace Illuminate\Tests\Queue;
  3. use Illuminate\Container\Container;
  4. use Illuminate\Contracts\Events\Dispatcher;
  5. use Illuminate\Foundation\Testing\Concerns\InteractsWithRedis;
  6. use Illuminate\Queue\Events\JobQueued;
  7. use Illuminate\Queue\Jobs\RedisJob;
  8. use Illuminate\Queue\RedisQueue;
  9. use Illuminate\Support\Carbon;
  10. use Illuminate\Support\InteractsWithTime;
  11. use Illuminate\Support\Str;
  12. use Mockery as m;
  13. use PHPUnit\Framework\TestCase;
  14. class RedisQueueIntegrationTest extends TestCase
  15. {
  16. use InteractsWithRedis, InteractsWithTime;
  17. /**
  18. * @var \Illuminate\Queue\RedisQueue
  19. */
  20. private $queue;
  21. /**
  22. * @var \Mockery\MockInterface|\Mockery\LegacyMockInterface
  23. */
  24. private $container;
  25. protected function setUp(): void
  26. {
  27. Carbon::setTestNow(Carbon::now());
  28. parent::setUp();
  29. $this->setUpRedis();
  30. }
  31. protected function tearDown(): void
  32. {
  33. Carbon::setTestNow(null);
  34. parent::tearDown();
  35. $this->tearDownRedis();
  36. m::close();
  37. }
  38. /**
  39. * @dataProvider redisDriverProvider
  40. *
  41. * @param string $driver
  42. */
  43. public function testExpiredJobsArePopped($driver)
  44. {
  45. $this->setQueue($driver);
  46. $jobs = [
  47. new RedisQueueIntegrationTestJob(0),
  48. new RedisQueueIntegrationTestJob(1),
  49. new RedisQueueIntegrationTestJob(2),
  50. new RedisQueueIntegrationTestJob(3),
  51. ];
  52. $this->queue->later(1000, $jobs[0]);
  53. $this->queue->later(-200, $jobs[1]);
  54. $this->queue->later(-300, $jobs[2]);
  55. $this->queue->later(-100, $jobs[3]);
  56. $this->container->shouldHaveReceived('bound')->with('events')->times(4);
  57. $this->assertEquals($jobs[2], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command));
  58. $this->assertEquals($jobs[1], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command));
  59. $this->assertEquals($jobs[3], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command));
  60. $this->assertNull($this->queue->pop());
  61. $this->assertEquals(1, $this->redis[$driver]->connection()->zcard('queues:default:delayed'));
  62. $this->assertEquals(3, $this->redis[$driver]->connection()->zcard('queues:default:reserved'));
  63. }
  64. /**
  65. * @dataProvider redisDriverProvider
  66. * @requires extension pcntl
  67. *
  68. * @param mixed $driver
  69. *
  70. * @throws \Exception
  71. */
  72. public function testBlockingPop($driver)
  73. {
  74. $this->tearDownRedis();
  75. if ($pid = pcntl_fork() > 0) {
  76. $this->setUpRedis();
  77. $this->setQueue($driver, 'default', null, 60, 10);
  78. $this->assertEquals(12, unserialize(json_decode($this->queue->pop()->getRawBody())->data->command)->i);
  79. } elseif ($pid == 0) {
  80. $this->setUpRedis();
  81. $this->setQueue('phpredis');
  82. sleep(1);
  83. $this->queue->push(new RedisQueueIntegrationTestJob(12));
  84. exit;
  85. } else {
  86. $this->fail('Cannot fork');
  87. }
  88. }
  89. /**
  90. * @dataProvider redisDriverProvider
  91. *
  92. * @param string $driver
  93. */
  94. public function testMigrateMoreThan100Jobs($driver)
  95. {
  96. $this->setQueue($driver);
  97. for ($i = -1; $i >= -201; $i--) {
  98. $this->queue->later($i, new RedisQueueIntegrationTestJob($i));
  99. }
  100. for ($i = -201; $i <= -1; $i++) {
  101. $this->assertEquals($i, unserialize(json_decode($this->queue->pop()->getRawBody())->data->command)->i);
  102. $this->assertEquals(-$i - 1, $this->redis[$driver]->llen('queues:default:notify'));
  103. }
  104. }
  105. /**
  106. * @dataProvider redisDriverProvider
  107. *
  108. * @param string $driver
  109. */
  110. public function testPopProperlyPopsJobOffOfRedis($driver)
  111. {
  112. $this->setQueue($driver);
  113. // Push an item into queue
  114. $job = new RedisQueueIntegrationTestJob(10);
  115. $this->queue->push($job);
  116. // Pop and check it is popped correctly
  117. $before = $this->currentTime();
  118. /** @var \Illuminate\Queue\Jobs\RedisJob $redisJob */
  119. $redisJob = $this->queue->pop();
  120. $after = $this->currentTime();
  121. $this->assertEquals($job, unserialize(json_decode($redisJob->getRawBody())->data->command));
  122. $this->assertEquals(1, $redisJob->attempts());
  123. $this->assertEquals($job, unserialize(json_decode($redisJob->getReservedJob())->data->command));
  124. $this->assertEquals(1, json_decode($redisJob->getReservedJob())->attempts);
  125. $this->assertEquals($redisJob->getJobId(), json_decode($redisJob->getReservedJob())->id);
  126. // Check reserved queue
  127. $this->assertEquals(1, $this->redis[$driver]->connection()->zcard('queues:default:reserved'));
  128. $result = $this->redis[$driver]->connection()->zrangebyscore('queues:default:reserved', -INF, INF, ['withscores' => true]);
  129. $reservedJob = array_keys($result)[0];
  130. $score = $result[$reservedJob];
  131. $this->assertLessThanOrEqual($score, $before + 60);
  132. $this->assertGreaterThanOrEqual($score, $after + 60);
  133. $this->assertEquals($job, unserialize(json_decode($reservedJob)->data->command));
  134. }
  135. /**
  136. * @dataProvider redisDriverProvider
  137. *
  138. * @param string $driver
  139. */
  140. public function testPopProperlyPopsDelayedJobOffOfRedis($driver)
  141. {
  142. $this->setQueue($driver);
  143. // Push an item into queue
  144. $job = new RedisQueueIntegrationTestJob(10);
  145. $this->queue->later(-10, $job);
  146. // Pop and check it is popped correctly
  147. $before = $this->currentTime();
  148. $this->assertEquals($job, unserialize(json_decode($this->queue->pop()->getRawBody())->data->command));
  149. $after = $this->currentTime();
  150. // Check reserved queue
  151. $this->assertEquals(1, $this->redis[$driver]->connection()->zcard('queues:default:reserved'));
  152. $result = $this->redis[$driver]->connection()->zrangebyscore('queues:default:reserved', -INF, INF, ['withscores' => true]);
  153. $reservedJob = array_keys($result)[0];
  154. $score = $result[$reservedJob];
  155. $this->assertLessThanOrEqual($score, $before + 60);
  156. $this->assertGreaterThanOrEqual($score, $after + 60);
  157. $this->assertEquals($job, unserialize(json_decode($reservedJob)->data->command));
  158. }
  159. /**
  160. * @dataProvider redisDriverProvider
  161. *
  162. * @param string $driver
  163. */
  164. public function testPopPopsDelayedJobOffOfRedisWhenExpireNull($driver)
  165. {
  166. $this->setQueue($driver, 'default', null, null);
  167. // Push an item into queue
  168. $job = new RedisQueueIntegrationTestJob(10);
  169. $this->queue->later(-10, $job);
  170. $this->container->shouldHaveReceived('bound')->with('events')->once();
  171. // Pop and check it is popped correctly
  172. $before = $this->currentTime();
  173. $this->assertEquals($job, unserialize(json_decode($this->queue->pop()->getRawBody())->data->command));
  174. $after = $this->currentTime();
  175. // Check reserved queue
  176. $this->assertEquals(1, $this->redis[$driver]->connection()->zcard('queues:default:reserved'));
  177. $result = $this->redis[$driver]->connection()->zrangebyscore('queues:default:reserved', -INF, INF, ['withscores' => true]);
  178. $reservedJob = array_keys($result)[0];
  179. $score = $result[$reservedJob];
  180. $this->assertLessThanOrEqual($score, $before);
  181. $this->assertGreaterThanOrEqual($score, $after);
  182. $this->assertEquals($job, unserialize(json_decode($reservedJob)->data->command));
  183. }
  184. /**
  185. * @dataProvider redisDriverProvider
  186. *
  187. * @param string $driver
  188. */
  189. public function testBlockingPopProperlyPopsJobOffOfRedis($driver)
  190. {
  191. $this->setQueue($driver, 'default', null, 60, 5);
  192. // Push an item into queue
  193. $job = new RedisQueueIntegrationTestJob(10);
  194. $this->queue->push($job);
  195. // Pop and check it is popped correctly
  196. /** @var \Illuminate\Queue\Jobs\RedisJob $redisJob */
  197. $redisJob = $this->queue->pop();
  198. $this->assertNotNull($redisJob);
  199. $this->assertEquals($job, unserialize(json_decode($redisJob->getReservedJob())->data->command));
  200. }
  201. /**
  202. * @dataProvider redisDriverProvider
  203. *
  204. * @param string $driver
  205. */
  206. public function testBlockingPopProperlyPopsExpiredJobs($driver)
  207. {
  208. Str::createUuidsUsing(function () {
  209. return 'uuid';
  210. });
  211. $this->setQueue($driver, 'default', null, 60, 5);
  212. $jobs = [
  213. new RedisQueueIntegrationTestJob(0),
  214. new RedisQueueIntegrationTestJob(1),
  215. ];
  216. $this->queue->later(-200, $jobs[0]);
  217. $this->queue->later(-200, $jobs[1]);
  218. $this->assertEquals($jobs[0], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command));
  219. $this->assertEquals($jobs[1], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command));
  220. $this->assertEquals(0, $this->redis[$driver]->connection()->llen('queues:default:notify'));
  221. $this->assertEquals(0, $this->redis[$driver]->connection()->zcard('queues:default:delayed'));
  222. $this->assertEquals(2, $this->redis[$driver]->connection()->zcard('queues:default:reserved'));
  223. Str::createUuidsNormally();
  224. }
  225. /**
  226. * @dataProvider redisDriverProvider
  227. *
  228. * @param string $driver
  229. */
  230. public function testNotExpireJobsWhenExpireNull($driver)
  231. {
  232. $this->setQueue($driver, 'default', null, null);
  233. // Make an expired reserved job
  234. $failed = new RedisQueueIntegrationTestJob(-20);
  235. $this->queue->push($failed);
  236. $this->container->shouldHaveReceived('bound')->with('events')->once();
  237. $beforeFailPop = $this->currentTime();
  238. $this->queue->pop();
  239. $afterFailPop = $this->currentTime();
  240. // Push an item into queue
  241. $job = new RedisQueueIntegrationTestJob(10);
  242. $this->queue->push($job);
  243. $this->container->shouldHaveReceived('bound')->with('events')->times(2);
  244. // Pop and check it is popped correctly
  245. $before = $this->currentTime();
  246. $this->assertEquals($job, unserialize(json_decode($this->queue->pop()->getRawBody())->data->command));
  247. $after = $this->currentTime();
  248. // Check reserved queue
  249. $this->assertEquals(2, $this->redis[$driver]->connection()->zcard('queues:default:reserved'));
  250. $result = $this->redis[$driver]->connection()->zrangebyscore('queues:default:reserved', -INF, INF, ['withscores' => true]);
  251. foreach ($result as $payload => $score) {
  252. $command = unserialize(json_decode($payload)->data->command);
  253. $this->assertInstanceOf(RedisQueueIntegrationTestJob::class, $command);
  254. $this->assertContains($command->i, [10, -20]);
  255. if ($command->i == 10) {
  256. $this->assertLessThanOrEqual($score, $before);
  257. $this->assertGreaterThanOrEqual($score, $after);
  258. } else {
  259. $this->assertLessThanOrEqual($score, $beforeFailPop);
  260. $this->assertGreaterThanOrEqual($score, $afterFailPop);
  261. }
  262. }
  263. }
  264. /**
  265. * @dataProvider redisDriverProvider
  266. *
  267. * @param string $driver
  268. */
  269. public function testExpireJobsWhenExpireSet($driver)
  270. {
  271. $this->setQueue($driver, 'default', null, 30);
  272. // Push an item into queue
  273. $job = new RedisQueueIntegrationTestJob(10);
  274. $this->queue->push($job);
  275. $this->container->shouldHaveReceived('bound')->with('events')->once();
  276. // Pop and check it is popped correctly
  277. $before = $this->currentTime();
  278. $this->assertEquals($job, unserialize(json_decode($this->queue->pop()->getRawBody())->data->command));
  279. $after = $this->currentTime();
  280. // Check reserved queue
  281. $this->assertEquals(1, $this->redis[$driver]->connection()->zcard('queues:default:reserved'));
  282. $result = $this->redis[$driver]->connection()->zrangebyscore('queues:default:reserved', -INF, INF, ['withscores' => true]);
  283. $reservedJob = array_keys($result)[0];
  284. $score = $result[$reservedJob];
  285. $this->assertLessThanOrEqual($score, $before + 30);
  286. $this->assertGreaterThanOrEqual($score, $after + 30);
  287. $this->assertEquals($job, unserialize(json_decode($reservedJob)->data->command));
  288. }
  289. /**
  290. * @dataProvider redisDriverProvider
  291. *
  292. * @param string $driver
  293. */
  294. public function testRelease($driver)
  295. {
  296. $this->setQueue($driver);
  297. // push a job into queue
  298. $job = new RedisQueueIntegrationTestJob(30);
  299. $this->queue->push($job);
  300. // pop and release the job
  301. /** @var \Illuminate\Queue\Jobs\RedisJob $redisJob */
  302. $redisJob = $this->queue->pop();
  303. $before = $this->currentTime();
  304. $redisJob->release(1000);
  305. $after = $this->currentTime();
  306. // check the content of delayed queue
  307. $this->assertEquals(1, $this->redis[$driver]->connection()->zcard('queues:default:delayed'));
  308. $results = $this->redis[$driver]->connection()->zrangebyscore('queues:default:delayed', -INF, INF, ['withscores' => true]);
  309. $payload = array_keys($results)[0];
  310. $score = $results[$payload];
  311. $this->assertGreaterThanOrEqual($before + 1000, $score);
  312. $this->assertLessThanOrEqual($after + 1000, $score);
  313. $decoded = json_decode($payload);
  314. $this->assertEquals(1, $decoded->attempts);
  315. $this->assertEquals($job, unserialize($decoded->data->command));
  316. // check if the queue has no ready item yet
  317. $this->assertNull($this->queue->pop());
  318. }
  319. /**
  320. * @dataProvider redisDriverProvider
  321. *
  322. * @param string $driver
  323. */
  324. public function testReleaseInThePast($driver)
  325. {
  326. $this->setQueue($driver);
  327. $job = new RedisQueueIntegrationTestJob(30);
  328. $this->queue->push($job);
  329. /** @var \Illuminate\Queue\Jobs\RedisJob $redisJob */
  330. $redisJob = $this->queue->pop();
  331. $redisJob->release(-3);
  332. $this->assertInstanceOf(RedisJob::class, $this->queue->pop());
  333. }
  334. /**
  335. * @dataProvider redisDriverProvider
  336. *
  337. * @param string $driver
  338. */
  339. public function testDelete($driver)
  340. {
  341. $this->setQueue($driver);
  342. $job = new RedisQueueIntegrationTestJob(30);
  343. $this->queue->push($job);
  344. /** @var \Illuminate\Queue\Jobs\RedisJob $redisJob */
  345. $redisJob = $this->queue->pop();
  346. $redisJob->delete();
  347. $this->assertEquals(0, $this->redis[$driver]->connection()->zcard('queues:default:delayed'));
  348. $this->assertEquals(0, $this->redis[$driver]->connection()->zcard('queues:default:reserved'));
  349. $this->assertEquals(0, $this->redis[$driver]->connection()->llen('queues:default'));
  350. $this->assertNull($this->queue->pop());
  351. }
  352. /**
  353. * @dataProvider redisDriverProvider
  354. *
  355. * @param string $driver
  356. */
  357. public function testClear($driver)
  358. {
  359. $this->setQueue($driver);
  360. $job1 = new RedisQueueIntegrationTestJob(30);
  361. $job2 = new RedisQueueIntegrationTestJob(40);
  362. $this->queue->push($job1);
  363. $this->queue->push($job2);
  364. $this->assertEquals(2, $this->queue->clear(null));
  365. $this->assertEquals(0, $this->queue->size());
  366. $this->assertEquals(0, $this->redis[$driver]->connection()->llen('queues:default:notify'));
  367. }
  368. /**
  369. * @dataProvider redisDriverProvider
  370. *
  371. * @param string $driver
  372. */
  373. public function testSize($driver)
  374. {
  375. $this->setQueue($driver);
  376. $this->assertEquals(0, $this->queue->size());
  377. $this->queue->push(new RedisQueueIntegrationTestJob(1));
  378. $this->assertEquals(1, $this->queue->size());
  379. $this->queue->later(60, new RedisQueueIntegrationTestJob(2));
  380. $this->assertEquals(2, $this->queue->size());
  381. $this->queue->push(new RedisQueueIntegrationTestJob(3));
  382. $this->assertEquals(3, $this->queue->size());
  383. $job = $this->queue->pop();
  384. $this->assertEquals(3, $this->queue->size());
  385. $job->delete();
  386. $this->assertEquals(2, $this->queue->size());
  387. }
  388. /**
  389. * @dataProvider redisDriverProvider
  390. *
  391. * @param string $driver
  392. */
  393. public function testPushJobQueuedEvent($driver)
  394. {
  395. $events = m::mock(Dispatcher::class);
  396. $events->shouldReceive('dispatch')->withArgs(function (JobQueued $jobQueued) {
  397. $this->assertInstanceOf(RedisQueueIntegrationTestJob::class, $jobQueued->job);
  398. $this->assertIsString(RedisQueueIntegrationTestJob::class, $jobQueued->id);
  399. return true;
  400. })->andReturnNull()->once();
  401. $container = m::mock(Container::class);
  402. $container->shouldReceive('bound')->with('events')->andReturn(true)->once();
  403. $container->shouldReceive('offsetGet')->with('events')->andReturn($events)->once();
  404. $queue = new RedisQueue($this->redis[$driver]);
  405. $queue->setContainer($container);
  406. $queue->push(new RedisQueueIntegrationTestJob(5));
  407. }
  408. /**
  409. * @dataProvider redisDriverProvider
  410. *
  411. * @param string $driver
  412. */
  413. public function testBulkJobQueuedEvent($driver)
  414. {
  415. $events = m::mock(Dispatcher::class);
  416. $events->shouldReceive('dispatch')->with(m::type(JobQueued::class))->andReturnNull()->times(3);
  417. $container = m::mock(Container::class);
  418. $container->shouldReceive('bound')->with('events')->andReturn(true)->times(3);
  419. $container->shouldReceive('offsetGet')->with('events')->andReturn($events)->times(3);
  420. $queue = new RedisQueue($this->redis[$driver]);
  421. $queue->setContainer($container);
  422. $queue->bulk([
  423. new RedisQueueIntegrationTestJob(5),
  424. new RedisQueueIntegrationTestJob(10),
  425. new RedisQueueIntegrationTestJob(15),
  426. ]);
  427. }
  428. /**
  429. * @param string $driver
  430. * @param string $default
  431. * @param string|null $connection
  432. * @param int $retryAfter
  433. * @param int|null $blockFor
  434. */
  435. private function setQueue($driver, $default = 'default', $connection = null, $retryAfter = 60, $blockFor = null)
  436. {
  437. $this->queue = new RedisQueue($this->redis[$driver], $default, $connection, $retryAfter, $blockFor);
  438. $this->container = m::spy(Container::class);
  439. $this->queue->setContainer($this->container);
  440. }
  441. }
  442. class RedisQueueIntegrationTestJob
  443. {
  444. public $i;
  445. public function __construct($i)
  446. {
  447. $this->i = $i;
  448. }
  449. public function handle()
  450. {
  451. //
  452. }
  453. }