BusBatchTest.php 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474
  1. <?php
  2. namespace Illuminate\Tests\Bus;
  3. use Carbon\CarbonImmutable;
  4. use Illuminate\Bus\Batch;
  5. use Illuminate\Bus\Batchable;
  6. use Illuminate\Bus\BatchFactory;
  7. use Illuminate\Bus\DatabaseBatchRepository;
  8. use Illuminate\Bus\PendingBatch;
  9. use Illuminate\Bus\Queueable;
  10. use Illuminate\Container\Container;
  11. use Illuminate\Contracts\Queue\Factory;
  12. use Illuminate\Contracts\Queue\ShouldQueue;
  13. use Illuminate\Database\Capsule\Manager as DB;
  14. use Illuminate\Database\Eloquent\Model;
  15. use Illuminate\Database\PostgresConnection;
  16. use Illuminate\Foundation\Bus\Dispatchable;
  17. use Illuminate\Queue\CallQueuedClosure;
  18. use Mockery as m;
  19. use PHPUnit\Framework\TestCase;
  20. use RuntimeException;
  21. use stdClass;
  22. class BusBatchTest extends TestCase
  23. {
  24. protected function setUp(): void
  25. {
  26. $db = new DB;
  27. $db->addConnection([
  28. 'driver' => 'sqlite',
  29. 'database' => ':memory:',
  30. ]);
  31. $db->bootEloquent();
  32. $db->setAsGlobal();
  33. $this->createSchema();
  34. $_SERVER['__finally.count'] = 0;
  35. $_SERVER['__then.count'] = 0;
  36. $_SERVER['__catch.count'] = 0;
  37. }
  38. /**
  39. * Setup the database schema.
  40. *
  41. * @return void
  42. */
  43. public function createSchema()
  44. {
  45. $this->schema()->create('job_batches', function ($table) {
  46. $table->string('id')->primary();
  47. $table->string('name');
  48. $table->integer('total_jobs');
  49. $table->integer('pending_jobs');
  50. $table->integer('failed_jobs');
  51. $table->text('failed_job_ids');
  52. $table->text('options')->nullable();
  53. $table->integer('cancelled_at')->nullable();
  54. $table->integer('created_at');
  55. $table->integer('finished_at')->nullable();
  56. });
  57. }
  58. /**
  59. * Tear down the database schema.
  60. *
  61. * @return void
  62. */
  63. protected function tearDown(): void
  64. {
  65. unset($_SERVER['__finally.batch'], $_SERVER['__then.batch'], $_SERVER['__catch.batch'], $_SERVER['__catch.exception']);
  66. $this->schema()->drop('job_batches');
  67. m::close();
  68. }
  69. public function test_jobs_can_be_added_to_the_batch()
  70. {
  71. $queue = m::mock(Factory::class);
  72. $batch = $this->createTestBatch($queue);
  73. $job = new class
  74. {
  75. use Batchable;
  76. };
  77. $secondJob = new class
  78. {
  79. use Batchable;
  80. };
  81. $thirdJob = function () {
  82. };
  83. $queue->shouldReceive('connection')->once()
  84. ->with('test-connection')
  85. ->andReturn($connection = m::mock(stdClass::class));
  86. $connection->shouldReceive('bulk')->once()->with(m::on(function ($args) use ($job, $secondJob) {
  87. return
  88. $args[0] == $job &&
  89. $args[1] == $secondJob &&
  90. $args[2] instanceof CallQueuedClosure
  91. && is_string($args[2]->batchId);
  92. }), '', 'test-queue');
  93. $batch = $batch->add([$job, $secondJob, $thirdJob]);
  94. $this->assertEquals(3, $batch->totalJobs);
  95. $this->assertEquals(3, $batch->pendingJobs);
  96. $this->assertIsString($job->batchId);
  97. $this->assertInstanceOf(CarbonImmutable::class, $batch->createdAt);
  98. }
  99. public function test_processed_jobs_can_be_calculated()
  100. {
  101. $queue = m::mock(Factory::class);
  102. $batch = $this->createTestBatch($queue);
  103. $batch->totalJobs = 10;
  104. $batch->pendingJobs = 4;
  105. $this->assertEquals(6, $batch->processedJobs());
  106. $this->assertEquals(60, $batch->progress());
  107. }
  108. public function test_successful_jobs_can_be_recorded()
  109. {
  110. $queue = m::mock(Factory::class);
  111. $batch = $this->createTestBatch($queue);
  112. $job = new class
  113. {
  114. use Batchable;
  115. };
  116. $secondJob = new class
  117. {
  118. use Batchable;
  119. };
  120. $queue->shouldReceive('connection')->once()
  121. ->with('test-connection')
  122. ->andReturn($connection = m::mock(stdClass::class));
  123. $connection->shouldReceive('bulk')->once();
  124. $batch = $batch->add([$job, $secondJob]);
  125. $this->assertEquals(2, $batch->pendingJobs);
  126. $batch->recordSuccessfulJob('test-id');
  127. $batch->recordSuccessfulJob('test-id');
  128. $this->assertInstanceOf(Batch::class, $_SERVER['__finally.batch']);
  129. $this->assertInstanceOf(Batch::class, $_SERVER['__then.batch']);
  130. $batch = $batch->fresh();
  131. $this->assertEquals(0, $batch->pendingJobs);
  132. $this->assertTrue($batch->finished());
  133. $this->assertEquals(1, $_SERVER['__finally.count']);
  134. $this->assertEquals(1, $_SERVER['__then.count']);
  135. }
  136. public function test_failed_jobs_can_be_recorded_while_not_allowing_failures()
  137. {
  138. $queue = m::mock(Factory::class);
  139. $batch = $this->createTestBatch($queue, $allowFailures = false);
  140. $job = new class
  141. {
  142. use Batchable;
  143. };
  144. $secondJob = new class
  145. {
  146. use Batchable;
  147. };
  148. $queue->shouldReceive('connection')->once()
  149. ->with('test-connection')
  150. ->andReturn($connection = m::mock(stdClass::class));
  151. $connection->shouldReceive('bulk')->once();
  152. $batch = $batch->add([$job, $secondJob]);
  153. $this->assertEquals(2, $batch->pendingJobs);
  154. $batch->recordFailedJob('test-id', new RuntimeException('Something went wrong.'));
  155. $batch->recordFailedJob('test-id', new RuntimeException('Something else went wrong.'));
  156. $this->assertInstanceOf(Batch::class, $_SERVER['__finally.batch']);
  157. $this->assertFalse(isset($_SERVER['__then.batch']));
  158. $batch = $batch->fresh();
  159. $this->assertEquals(2, $batch->pendingJobs);
  160. $this->assertEquals(2, $batch->failedJobs);
  161. $this->assertTrue($batch->finished());
  162. $this->assertTrue($batch->cancelled());
  163. $this->assertEquals(1, $_SERVER['__finally.count']);
  164. $this->assertEquals(1, $_SERVER['__catch.count']);
  165. $this->assertSame('Something went wrong.', $_SERVER['__catch.exception']->getMessage());
  166. }
  167. public function test_failed_jobs_can_be_recorded_while_allowing_failures()
  168. {
  169. $queue = m::mock(Factory::class);
  170. $batch = $this->createTestBatch($queue, $allowFailures = true);
  171. $job = new class
  172. {
  173. use Batchable;
  174. };
  175. $secondJob = new class
  176. {
  177. use Batchable;
  178. };
  179. $queue->shouldReceive('connection')->once()
  180. ->with('test-connection')
  181. ->andReturn($connection = m::mock(stdClass::class));
  182. $connection->shouldReceive('bulk')->once();
  183. $batch = $batch->add([$job, $secondJob]);
  184. $this->assertEquals(2, $batch->pendingJobs);
  185. $batch->recordFailedJob('test-id', new RuntimeException('Something went wrong.'));
  186. $batch->recordFailedJob('test-id', new RuntimeException('Something else went wrong.'));
  187. // While allowing failures this batch never actually completes...
  188. $this->assertFalse(isset($_SERVER['__then.batch']));
  189. $batch = $batch->fresh();
  190. $this->assertEquals(2, $batch->pendingJobs);
  191. $this->assertEquals(2, $batch->failedJobs);
  192. $this->assertFalse($batch->finished());
  193. $this->assertFalse($batch->cancelled());
  194. $this->assertEquals(1, $_SERVER['__catch.count']);
  195. $this->assertSame('Something went wrong.', $_SERVER['__catch.exception']->getMessage());
  196. }
  197. public function test_batch_can_be_cancelled()
  198. {
  199. $queue = m::mock(Factory::class);
  200. $batch = $this->createTestBatch($queue);
  201. $batch->cancel();
  202. $batch = $batch->fresh();
  203. $this->assertTrue($batch->cancelled());
  204. }
  205. public function test_batch_can_be_deleted()
  206. {
  207. $queue = m::mock(Factory::class);
  208. $batch = $this->createTestBatch($queue);
  209. $batch->delete();
  210. $batch = $batch->fresh();
  211. $this->assertNull($batch);
  212. }
  213. public function test_batch_state_can_be_inspected()
  214. {
  215. $queue = m::mock(Factory::class);
  216. $batch = $this->createTestBatch($queue);
  217. $this->assertFalse($batch->finished());
  218. $batch->finishedAt = now();
  219. $this->assertTrue($batch->finished());
  220. $batch->options['then'] = [];
  221. $this->assertFalse($batch->hasThenCallbacks());
  222. $batch->options['then'] = [1];
  223. $this->assertTrue($batch->hasThenCallbacks());
  224. $this->assertFalse($batch->allowsFailures());
  225. $batch->options['allowFailures'] = true;
  226. $this->assertTrue($batch->allowsFailures());
  227. $this->assertFalse($batch->hasFailures());
  228. $batch->failedJobs = 1;
  229. $this->assertTrue($batch->hasFailures());
  230. $batch->options['catch'] = [];
  231. $this->assertFalse($batch->hasCatchCallbacks());
  232. $batch->options['catch'] = [1];
  233. $this->assertTrue($batch->hasCatchCallbacks());
  234. $this->assertFalse($batch->cancelled());
  235. $batch->cancelledAt = now();
  236. $this->assertTrue($batch->cancelled());
  237. $this->assertIsString(json_encode($batch));
  238. }
  239. public function test_chain_can_be_added_to_batch()
  240. {
  241. $queue = m::mock(Factory::class);
  242. $batch = $this->createTestBatch($queue);
  243. $chainHeadJob = new ChainHeadJob;
  244. $secondJob = new SecondTestJob;
  245. $thirdJob = new ThirdTestJob;
  246. $queue->shouldReceive('connection')->once()
  247. ->with('test-connection')
  248. ->andReturn($connection = m::mock(stdClass::class));
  249. $connection->shouldReceive('bulk')->once()->with(m::on(function ($args) use ($chainHeadJob, $secondJob, $thirdJob) {
  250. return
  251. $args[0] == $chainHeadJob
  252. && serialize($secondJob) == $args[0]->chained[0]
  253. && serialize($thirdJob) == $args[0]->chained[1];
  254. }), '', 'test-queue');
  255. $batch = $batch->add([
  256. [$chainHeadJob, $secondJob, $thirdJob],
  257. ]);
  258. $this->assertEquals(3, $batch->totalJobs);
  259. $this->assertEquals(3, $batch->pendingJobs);
  260. $this->assertSame('test-queue', $chainHeadJob->chainQueue);
  261. $this->assertIsString($chainHeadJob->batchId);
  262. $this->assertIsString($secondJob->batchId);
  263. $this->assertIsString($thirdJob->batchId);
  264. $this->assertInstanceOf(CarbonImmutable::class, $batch->createdAt);
  265. }
  266. public function test_options_serialization_on_postgres()
  267. {
  268. $pendingBatch = (new PendingBatch(new Container, collect()))
  269. ->onQueue('test-queue');
  270. $connection = m::spy(PostgresConnection::class);
  271. $connection->shouldReceive('table')->andReturnSelf()
  272. ->shouldReceive('where')->andReturnSelf();
  273. $repository = new DatabaseBatchRepository(
  274. new BatchFactory(m::mock(Factory::class)), $connection, 'job_batches'
  275. );
  276. $repository->store($pendingBatch);
  277. $connection->shouldHaveReceived('insert')
  278. ->withArgs(function ($argument) use ($pendingBatch) {
  279. return unserialize(base64_decode($argument['options'])) === $pendingBatch->options;
  280. });
  281. }
  282. /**
  283. * @dataProvider serializedOptions
  284. */
  285. public function test_options_unserialize_on_postgres($serialize, $options)
  286. {
  287. $factory = m::mock(BatchFactory::class);
  288. $connection = m::spy(PostgresConnection::class);
  289. $connection->shouldReceive('table->where->first')
  290. ->andReturn($m = (object) [
  291. 'id' => '',
  292. 'name' => '',
  293. 'total_jobs' => '',
  294. 'pending_jobs' => '',
  295. 'failed_jobs' => '',
  296. 'failed_job_ids' => '[]',
  297. 'options' => $serialize,
  298. 'created_at' => now()->timestamp,
  299. 'cancelled_at' => null,
  300. 'finished_at' => null,
  301. ]);
  302. $batch = (new DatabaseBatchRepository($factory, $connection, 'job_batches'));
  303. $factory->shouldReceive('make')
  304. ->withSomeOfArgs($batch, '', '', '', '', '', '', $options);
  305. $batch->find(1);
  306. }
  307. /**
  308. * @return array
  309. */
  310. public function serializedOptions()
  311. {
  312. $options = [1, 2];
  313. return [
  314. [serialize($options), $options],
  315. [base64_encode(serialize($options)), $options],
  316. ];
  317. }
  318. protected function createTestBatch($queue, $allowFailures = false)
  319. {
  320. $repository = new DatabaseBatchRepository(new BatchFactory($queue), DB::connection(), 'job_batches');
  321. $pendingBatch = (new PendingBatch(new Container, collect()))
  322. ->then(function (Batch $batch) {
  323. $_SERVER['__then.batch'] = $batch;
  324. $_SERVER['__then.count']++;
  325. })
  326. ->catch(function (Batch $batch, $e) {
  327. $_SERVER['__catch.batch'] = $batch;
  328. $_SERVER['__catch.exception'] = $e;
  329. $_SERVER['__catch.count']++;
  330. })
  331. ->finally(function (Batch $batch) {
  332. $_SERVER['__finally.batch'] = $batch;
  333. $_SERVER['__finally.count']++;
  334. })
  335. ->allowFailures($allowFailures)
  336. ->onConnection('test-connection')
  337. ->onQueue('test-queue');
  338. return $repository->store($pendingBatch);
  339. }
  340. /**
  341. * Get a database connection instance.
  342. *
  343. * @return \Illuminate\Database\Connection
  344. */
  345. protected function connection()
  346. {
  347. return Model::getConnectionResolver()->connection();
  348. }
  349. /**
  350. * Get a schema builder instance.
  351. *
  352. * @return \Illuminate\Database\Schema\Builder
  353. */
  354. protected function schema()
  355. {
  356. return $this->connection()->getSchemaBuilder();
  357. }
  358. }
  359. class ChainHeadJob implements ShouldQueue
  360. {
  361. use Dispatchable, Queueable, Batchable;
  362. }
  363. class SecondTestJob implements ShouldQueue
  364. {
  365. use Dispatchable, Queueable, Batchable;
  366. }
  367. class ThirdTestJob implements ShouldQueue
  368. {
  369. use Dispatchable, Queueable, Batchable;
  370. }