aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDreamy Jazz <wpgbrown@wikimedia.org>2024-12-05 16:47:38 +0000
committerDreamy Jazz <wpgbrown@wikimedia.org>2024-12-05 20:13:56 +0000
commit92f00eea802edb827bb08c303d8875767fa13793 (patch)
tree858d54597dce9528e2efbbfe23341ca022c82714
parent7943b23bbb646d055adf49e9c150405a9dda9717 (diff)
downloadmediawikicore-92f00eea802edb827bb08c303d8875767fa13793.tar.gz
mediawikicore-92f00eea802edb827bb08c303d8875767fa13793.zip
Silence inserts to the job table in JobQueueDB
Why: * The JobQueueDB class is the default method used to queue jobs in MediaWiki and uses the job table to store queued jobs. * To insert a job into the queue the class needs to perform writes to the job table, which is done when a transaction is about to commit or there is no transaction active. * Therefore, when a job is inserted to the queue and the job queue uses the JobQueueDB class, it causes a write either immediately or soon after the call. * This means that on GET requests, the write violates the TransactionProfiler expectations. * There is nothing that we can do here to avoid the writes on GET requests, because some code needs to ensure the job is inserted before sending a response. Therefore, we should silence the warnings regarding performing writes. What: * Wrap calls to get a primary DB and inserting to the primary DB in a scope which silences the TransactionProfiler warnings related to using a primary DB connection. * Create tests for JobQueueDB so that the code being added is covered by tests. Bug: T379766 Change-Id: Ibd894ab0b99398429f3d8e9849fc0608f774e6cf
-rw-r--r--includes/jobqueue/JobQueueDB.php10
-rw-r--r--tests/phpunit/integration/includes/jobqueue/JobQueueDBTest.php174
2 files changed, 184 insertions, 0 deletions
diff --git a/includes/jobqueue/JobQueueDB.php b/includes/jobqueue/JobQueueDB.php
index 29f6117cf9d5..95fff17c8436 100644
--- a/includes/jobqueue/JobQueueDB.php
+++ b/includes/jobqueue/JobQueueDB.php
@@ -26,6 +26,7 @@ use Wikimedia\Rdbms\IReadableDatabase;
use Wikimedia\Rdbms\RawSQLValue;
use Wikimedia\Rdbms\SelectQueryBuilder;
use Wikimedia\Rdbms\ServerInfo;
+use Wikimedia\ScopedCallback;
/**
* Database-backed job queue storage.
@@ -210,7 +211,11 @@ class JobQueueDB extends JobQueue {
* @return void
*/
protected function doBatchPush( array $jobs, $flags ) {
+ // Silence expectations related to getting a primary DB, as we have to get a primary DB to insert the job.
+ $transactionProfiler = Profiler::instance()->getTransactionProfiler();
+ $scope = $transactionProfiler->silenceForScope();
$dbw = $this->getPrimaryDB();
+ ScopedCallback::consume( $scope );
// In general, there will be two cases here:
// a) sqlite; DB connection is probably a regular round-aware handle.
// If the connection is busy with a transaction, then defer the job writes
@@ -280,6 +285,10 @@ class JobQueueDB extends JobQueue {
}
// Build the full list of job rows to insert
$rows = array_merge( $rowList, array_values( $rowSet ) );
+ // Silence expectations related to inserting to the job table, because we have to perform the inserts to
+ // track the job.
+ $transactionProfiler = Profiler::instance()->getTransactionProfiler();
+ $scope = $transactionProfiler->silenceForScope();
// Insert the job rows in chunks to avoid replica DB lag...
foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
$dbw->newInsertQueryBuilder()
@@ -287,6 +296,7 @@ class JobQueueDB extends JobQueue {
->rows( $rowBatch )
->caller( $method )->execute();
}
+ ScopedCallback::consume( $scope );
$this->incrStats( 'inserts', $this->type, count( $rows ) );
$this->incrStats( 'dupe_inserts', $this->type,
count( $rowSet ) + count( $rowList ) - count( $rows )
diff --git a/tests/phpunit/integration/includes/jobqueue/JobQueueDBTest.php b/tests/phpunit/integration/includes/jobqueue/JobQueueDBTest.php
new file mode 100644
index 000000000000..3d1f5006a3ec
--- /dev/null
+++ b/tests/phpunit/integration/includes/jobqueue/JobQueueDBTest.php
@@ -0,0 +1,174 @@
+<?php
+
+namespace MediaWiki\Tests\Integration\JobQueue;
+
+use JobQueue;
+use JobQueueDB;
+use JobSpecification;
+use MediaWiki\Title\Title;
+use MediaWiki\WikiMap\WikiMap;
+use MediaWikiIntegrationTestCase;
+use Profiler;
+use Psr\Log\LoggerInterface;
+use Psr\Log\NullLogger;
+
+/**
+ * @covers JobQueueDB
+ * @group JobQueue
+ * @group Database
+ */
+class JobQueueDBTest extends MediaWikiIntegrationTestCase {
+
+ private function newJobQueue(): JobQueueDB {
+ $jobQueue = JobQueue::factory( [
+ 'class' => JobQueueDB::class,
+ 'domain' => WikiMap::getCurrentWikiDbDomain()->getId(),
+ 'type' => 'null',
+ 'idGenerator' => $this->getServiceContainer()->getGlobalIdGenerator(),
+ 'claimTTL' => 3600,
+ ] );
+ $this->assertInstanceOf( JobQueueDB::class, $jobQueue );
+ return $jobQueue;
+ }
+
+ /**
+ * @return JobSpecification A job which has a parameter named 'id' which should be a unique enough ID to
+ * assert that two given jobs are either the same or not the same.
+ */
+ private function newJobSpecification() {
+ return new JobSpecification(
+ 'null',
+ [ 'customParameter' => null, 'id' => mt_rand( 1, 0xFFFFFF ) ],
+ [],
+ Title::makeTitle( NS_MAIN, 'Custom title' )
+ );
+ }
+
+ private function addQueuedJobs( JobQueueDB $jobQueue, int $numOfQueuedJobs ) {
+ for ( $i = 0; $i < $numOfQueuedJobs; $i++ ) {
+ $jobQueue->push( $this->newJobSpecification() );
+ }
+ }
+
+ private function addAcquiredJobs( JobQueueDB $jobQueue, int $numOfAcquiredJobs ) {
+ for ( $i = 0; $i < $numOfAcquiredJobs; $i++ ) {
+ $jobQueue->push( $this->newJobSpecification() );
+ $jobQueue->pop();
+ }
+ }
+
+ /** @dataProvider provideIsEmpty */
+ public function testIsEmpty( $numOfQueuedJobs, $numOfAcquiredJobs, $expectedReturnValue ) {
+ $jobQueue = $this->newJobQueue();
+ $this->addQueuedJobs( $jobQueue, $numOfQueuedJobs );
+ $this->addAcquiredJobs( $jobQueue, $numOfAcquiredJobs );
+ $this->assertSame( $expectedReturnValue, $jobQueue->isEmpty() );
+ }
+
+ public static function provideIsEmpty() {
+ return [
+ 'Job queue is empty' => [ 0, 0, true ],
+ 'Job queue has all acquired jobs' => [ 0, 2, true ],
+ 'Job queue has all queued jobs' => [ 2, 0, false ],
+ 'Job queue has a mix of acquired and queued jobs' => [ 2, 1, false ],
+ ];
+ }
+
+ /** @dataProvider provideGetSize */
+ public function testGetSize( $numOfQueuedJobs, $numOfAcquiredJobs, $expectedReturnValue ) {
+ $jobQueue = $this->newJobQueue();
+ $this->addQueuedJobs( $jobQueue, $numOfQueuedJobs );
+ $this->addAcquiredJobs( $jobQueue, $numOfAcquiredJobs );
+ $this->assertSame( $expectedReturnValue, $jobQueue->getSize() );
+ }
+
+ public static function provideGetSize() {
+ return [
+ 'Job queue is empty' => [ 0, 0, 0 ],
+ 'Job queue has all acquired jobs' => [ 0, 2, 0 ],
+ 'Job queue has all queued jobs' => [ 2, 0, 2 ],
+ 'Job queue has a mix of acquired and queued jobs' => [ 2, 1, 2 ],
+ ];
+ }
+
+ /** @dataProvider provideGetAcquiredCount */
+ public function testGetAcquiredCount( $numOfQueuedJobs, $numOfAcquiredJobs, $expectedReturnValue ) {
+ $jobQueue = $this->newJobQueue();
+ $this->addQueuedJobs( $jobQueue, $numOfQueuedJobs );
+ $this->addAcquiredJobs( $jobQueue, $numOfAcquiredJobs );
+ $this->assertSame( $expectedReturnValue, $jobQueue->getAcquiredCount() );
+ }
+
+ public static function provideGetAcquiredCount() {
+ return [
+ 'Job queue is empty' => [ 0, 0, 0 ],
+ 'Job queue has all acquired jobs' => [ 0, 2, 2 ],
+ 'Job queue has all queued jobs' => [ 2, 0, 0 ],
+ 'Job queue has a mix of acquired and queued jobs' => [ 2, 1, 1 ],
+ ];
+ }
+
+ public function testGetAllQueuedJobsWhenJobQueueEmpty() {
+ $this->assertCount( 0, $this->newJobQueue()->getAllQueuedJobs() );
+ }
+
+ public function testGetAllQueuedJobs() {
+ // Queue a job into the job queue
+ $jobQueue = $this->newJobQueue();
+ $firstJob = $this->newJobSpecification();
+ $jobQueue->push( $firstJob );
+ // Check that ::getAllQueuedJobs returns the job we just queued into the queue.
+ $allQueuedJobs = iterator_to_array( $jobQueue->getAllQueuedJobs() );
+ $this->assertCount( 1, $allQueuedJobs );
+ $this->assertSame( $firstJob->getParams()['id'], $allQueuedJobs[0]->getParams()['id'] );
+ }
+
+ public function testGetAllAcquiredJobsWhenJobQueueEmpty() {
+ $this->assertCount( 0, $this->newJobQueue()->getAllAcquiredJobs() );
+ }
+
+ public function testGetAllAcquiredJobs() {
+ // Add a job to the queue
+ $queue = $this->newJobQueue();
+ $queuedJob = $this->newJobSpecification();
+ $queue->push( $queuedJob );
+ $this->assertCount( 0, $queue->getAllAcquiredJobs() );
+
+ // Pop the job off the queue and check that it is the correct job
+ $job = $queue->pop();
+ $this->assertNotFalse( $job );
+ $this->assertSame( $queuedJob->getParams()['id'], $job->getParams()['id'] );
+
+ // Check that ::getAllAcquiredJobs returns the job we just popped off the queue
+ $acquiredJobs = iterator_to_array( $queue->getAllAcquiredJobs() );
+ $this->assertCount( 1, $acquiredJobs );
+ $this->assertSame( $job->getParams()['id'], $acquiredJobs[0]->getParams()['id'] );
+ }
+
+ public function testPushWhenTransactionProfilerExpectsNoWrites() {
+ // Narrow the expectations to be no writes and no master connections.
+ $transactionProfiler = Profiler::instance()->getTransactionProfiler();
+ $transactionProfiler->setExpectations(
+ [ 'writes' => 0, 'masterConns' => 0 ],
+ __METHOD__
+ );
+ // Expect no calls to the LoggerInterface used by the profiler. We can't use ::createNoOpMock because we need
+ // to reset the transaction profiler to clean up after the test.
+ $logCreated = false;
+ $mockLogger = $this->createMock( LoggerInterface::class );
+ $mockLogger->method( $this->anythingBut( [ '__debugInfo', '__destruct' ] ) )
+ ->willReturnCallback( static function () use ( &$logCreated ) {
+ $logCreated = true;
+ } );
+ $transactionProfiler->setLogger( $mockLogger );
+
+ // Push a job into the queue, which should cause a write to occur without trying to log a violated exception.
+ $queue = $this->newJobQueue();
+ $queue->push( $this->newJobSpecification() );
+ $this->assertFalse( $logCreated );
+
+ // Reset the logger and expectations to clean up after ourselves.
+ $transactionProfiler->setLogger( new NullLogger() );
+ $transactionProfiler->resetExpectations();
+ }
+}