aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTimo Tijhof <krinkle@fastmail.com>2023-08-25 00:38:02 +0100
committerTimo Tijhof <krinkle@fastmail.com>2023-09-09 20:42:02 +0100
commite56552557f0da1260ea567c3c936f5c7a1a3c760 (patch)
treea8b2149ace05f4c534ba0a053966d703eb92fc2e
parent47cdcbac8b3fbefb64b70319090c1add021d195b (diff)
downloadmediawikicore-e56552557f0da1260ea567c3c936f5c7a1a3c760.tar.gz
mediawikicore-e56552557f0da1260ea567c3c936f5c7a1a3c760.zip
deferred: Decouple DeferredUpdates from MediaWikiServices
* Create task-specific methods with simple defaults that require no mocking or stubbing of any kind, as used by the pure unit tests where service container (and by extent, storage services) are disabled. * Remove all use of global variables, LBFactory, JobQueue, StatsdFactory, and RequestContext. Bug: T265749 Change-Id: If85c448d2d1b806e70f641f06263680d49c6eeec
-rw-r--r--autoload.php1
-rw-r--r--includes/Storage/DerivedPageDataUpdater.php2
-rw-r--r--includes/deferred/DeferredUpdates.php139
-rw-r--r--includes/deferred/DeferredUpdatesScopeMediaWikiStack.php133
-rw-r--r--includes/deferred/DeferredUpdatesScopeStack.php44
-rw-r--r--includes/deferred/RefreshSecondaryDataUpdate.php2
-rw-r--r--tests/phpunit/MediaWikiUnitTestCase.php3
-rw-r--r--tests/phpunit/includes/deferred/DeferredUpdatesTest.php3
8 files changed, 220 insertions, 107 deletions
diff --git a/autoload.php b/autoload.php
index e75ebf2ca7af..8e266977c7f3 100644
--- a/autoload.php
+++ b/autoload.php
@@ -344,6 +344,7 @@ $wgAutoloadLocalClasses = [
'DeferrableUpdate' => __DIR__ . '/includes/deferred/DeferrableUpdate.php',
'DeferredUpdates' => __DIR__ . '/includes/deferred/DeferredUpdates.php',
'DeferredUpdatesScope' => __DIR__ . '/includes/deferred/DeferredUpdatesScope.php',
+ 'DeferredUpdatesScopeMediaWikiStack' => __DIR__ . '/includes/deferred/DeferredUpdatesScopeMediaWikiStack.php',
'DeferredUpdatesScopeStack' => __DIR__ . '/includes/deferred/DeferredUpdatesScopeStack.php',
'Deflate' => __DIR__ . '/includes/libs/Deflate.php',
'DeleteAction' => __DIR__ . '/includes/actions/DeleteAction.php',
diff --git a/includes/Storage/DerivedPageDataUpdater.php b/includes/Storage/DerivedPageDataUpdater.php
index a8aa749f0692..9a44c7b0ce87 100644
--- a/includes/Storage/DerivedPageDataUpdater.php
+++ b/includes/Storage/DerivedPageDataUpdater.php
@@ -1827,7 +1827,7 @@ class DerivedPageDataUpdater implements IDBAccessObject, LoggerAwareInterface, P
$update->setCause( $causeAction, $causeAgent );
if ( $options['defer'] === false ) {
- DeferredUpdates::attemptUpdate( $update, $this->loadbalancerFactory );
+ DeferredUpdates::attemptUpdate( $update );
} else {
DeferredUpdates::addUpdate( $update, $options['defer'] );
}
diff --git a/includes/deferred/DeferredUpdates.php b/includes/deferred/DeferredUpdates.php
index 2d8ef23915eb..b4a1b6dc8350 100644
--- a/includes/deferred/DeferredUpdates.php
+++ b/includes/deferred/DeferredUpdates.php
@@ -19,10 +19,7 @@
*/
use MediaWiki\Logger\LoggerFactory;
-use MediaWiki\MediaWikiServices;
-use Wikimedia\Rdbms\DBTransactionError;
use Wikimedia\Rdbms\IDatabase;
-use Wikimedia\Rdbms\ILBFactory;
use Wikimedia\ScopedCallback;
/**
@@ -119,14 +116,22 @@ class DeferredUpdates {
* @return DeferredUpdatesScopeStack
*/
private static function getScopeStack(): DeferredUpdatesScopeStack {
- if ( self::$scopeStack === null ) {
- self::$scopeStack = new DeferredUpdatesScopeStack();
- }
-
+ self::$scopeStack ??= new DeferredUpdatesScopeMediaWikiStack();
return self::$scopeStack;
}
/**
+ * @param DeferredUpdatesScopeStack $scopeStack
+ * @internal Only for use in tests.
+ */
+ public static function setScopeStack( DeferredUpdatesScopeStack $scopeStack ): void {
+ if ( !defined( 'MW_PHPUNIT_TEST' ) ) {
+ throw new LogicException( 'Cannot reconfigure DeferredUpdates outside tests' );
+ }
+ self::$scopeStack = $scopeStack;
+ }
+
+ /**
* Add an update to the pending update queue for execution at the appropriate time
*
* In CLI mode, callback magic will also be used to run updates when safe
@@ -147,12 +152,8 @@ class DeferredUpdates {
* @since 1.28 Added the $stage parameter
*/
public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
- global $wgCommandLineMode;
-
self::getScopeStack()->current()->addUpdate( $update, $stage );
- if ( $wgCommandLineMode ) {
- self::tryOpportunisticExecute();
- }
+ self::tryOpportunisticExecute();
}
/**
@@ -174,24 +175,13 @@ class DeferredUpdates {
* a job in the job queue system if possible (e.g. implements EnqueueableDataUpdate)
*
* @param DeferrableUpdate $update
- * @param string $httpMethod
* @return Throwable|null
*/
- private static function run(
- DeferrableUpdate $update,
- $httpMethod
- ): ?Throwable {
+ private static function run( DeferrableUpdate $update ): ?Throwable {
$logger = LoggerFactory::getInstance( 'DeferredUpdates' );
- $services = MediaWikiServices::getInstance();
- $stats = $services->getStatsdDataFactory();
- $lbFactory = $services->getDBLoadBalancerFactory();
- $jobQueueGroupFactory = $services->getJobQueueGroupFactory();
-
- $suffix = $update instanceof DeferrableCallback ? '_' . $update->getOrigin() : '';
- $type = get_class( $update ) . $suffix;
- $stats->increment( "deferred_updates.$httpMethod.$type" );
-
+ $type = get_class( $update )
+ . ( $update instanceof DeferrableCallback ? '_' . $update->getOrigin() : '' );
$updateId = spl_object_id( $update );
$logger->debug( __METHOD__ . ": started $type #$updateId" );
@@ -199,7 +189,7 @@ class DeferredUpdates {
$startTime = microtime( true );
try {
- self::attemptUpdate( $update, $lbFactory );
+ self::attemptUpdate( $update );
} catch ( Throwable $updateException ) {
MWExceptionHandler::logException( $updateException );
$logger->error(
@@ -209,7 +199,7 @@ class DeferredUpdates {
'exception' => $updateException,
]
);
- $lbFactory->rollbackPrimaryChanges( __METHOD__ );
+ self::getScopeStack()->onRunUpdateFailed( $update );
} finally {
$walltime = microtime( true ) - $startTime;
$logger->debug( __METHOD__ . ": ended $type #$updateId, processing time: $walltime" );
@@ -218,8 +208,7 @@ class DeferredUpdates {
// Try to push the update as a job so it can run later if possible
if ( $updateException && $update instanceof EnqueueableDataUpdate ) {
try {
- $spec = $update->getAsJobSpecification();
- $jobQueueGroupFactory->makeJobQueueGroup( $spec['domain'] )->push( $spec['job'] );
+ self::getScopeStack()->queueDataUpdate( $update );
} catch ( Throwable $jobException ) {
MWExceptionHandler::logException( $jobException );
$logger->error(
@@ -229,7 +218,7 @@ class DeferredUpdates {
'exception' => $jobException,
]
);
- $lbFactory->rollbackPrimaryChanges( __METHOD__ );
+ self::getScopeStack()->onRunUpdateFailed( $update );
}
}
@@ -255,12 +244,6 @@ class DeferredUpdates {
* (DeferredUpdates::PRESEND, DeferredUpdates::POSTSEND, DeferredUpdates::ALL)
*/
public static function doUpdates( $stage = self::ALL ) {
- global $wgCommandLineMode;
-
- $httpMethod = $wgCommandLineMode
- ? 'cli'
- : strtolower( RequestContext::getMain()->getRequest()->getMethod() );
-
/** @var ErrorPageError $guiError First presentable client-level error thrown */
$guiError = null;
/** @var Throwable $exception First of any error thrown */
@@ -286,13 +269,11 @@ class DeferredUpdates {
$scope->processUpdates(
$stage,
- static function ( DeferrableUpdate $update, $activeStage )
- use ( $httpMethod, &$guiError, &$exception )
- {
+ static function ( DeferrableUpdate $update, $activeStage ) use ( &$guiError, &$exception ) {
$scopeStack = self::getScopeStack();
$childScope = $scopeStack->descend( $activeStage, $update );
try {
- $e = self::run( $update, $httpMethod );
+ $e = self::run( $update );
$guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null );
$exception = $exception ?: $e;
// Any addUpdate() calls between descend() and ascend() used the sub-queue.
@@ -302,10 +283,8 @@ class DeferredUpdates {
// queues as appropriate...
$childScope->processUpdates(
$activeStage,
- static function ( DeferrableUpdate $subUpdate )
- use ( $httpMethod, &$guiError, &$exception )
- {
- $e = self::run( $subUpdate, $httpMethod );
+ static function ( DeferrableUpdate $sub ) use ( &$guiError, &$exception ) {
+ $e = self::run( $sub );
$guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null );
$exception = $exception ?: $e;
}
@@ -332,26 +311,6 @@ class DeferredUpdates {
}
/**
- * @return bool If a transaction round is active or connection is not ready for commit()
- */
- private static function areDatabaseTransactionsActive() {
- $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
- if ( $lbFactory->hasTransactionRound()
- || !$lbFactory->isReadyForRoundOperations()
- ) {
- return true;
- }
-
- foreach ( $lbFactory->getAllLBs() as $lb ) {
- if ( $lb->hasPrimaryChanges() || $lb->explicitTrxActive() ) {
- return true;
- }
- }
-
- return false;
- }
-
- /**
* Consume and execute pending updates now if possible, instead of waiting.
*
* In web requests, updates are always deferred until the end of the request.
@@ -396,25 +355,25 @@ class DeferredUpdates {
return false;
}
- // Run the updates for this context if they will have outer transaction scope
- if ( !self::areDatabaseTransactionsActive() ) {
+ if ( self::getScopeStack()->allowOpportunisticUpdates() ) {
self::doUpdates( self::ALL );
-
return true;
}
if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
// There are a large number of pending updates and none of them can run yet.
- // The odds of losing updates due to an error increase when executing long queues
+ // The odds of losing updates due to an error increases when executing long queues
// and when large amounts of time pass while tasks are queued. Mitigate this by
- // trying to migrate updates to the job queue system (where applicable).
+ // trying to eagerly move updates to the JobQueue when possible.
+ //
+ // TODO: Do we still need this now maintenance scripts automatically call
+ // tryOpportunisticExecute from addUpdate, from every commit, and every
+ // waitForReplication call?
self::getScopeStack()->current()->consumeMatchingUpdates(
self::ALL,
EnqueueableDataUpdate::class,
static function ( EnqueueableDataUpdate $update ) {
- $spec = $update->getAsJobSpecification();
- $jobQueueGroupFactory = MediaWikiServices::getInstance()->getJobQueueGroupFactory();
- $jobQueueGroupFactory->makeJobQueueGroup( $spec['domain'] )->push( $spec['job'] );
+ self::getScopeStack()->queueDataUpdate( $update );
}
);
}
@@ -496,39 +455,13 @@ class DeferredUpdates {
* transaction round.
*
* @param DeferrableUpdate $update
- * @param ILBFactory $lbFactory
* @since 1.34
*/
- public static function attemptUpdate( DeferrableUpdate $update, ILBFactory $lbFactory ) {
- $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
- if ( !$ticket || $lbFactory->hasTransactionRound() ) {
- throw new DBTransactionError( null, "A database transaction round is pending." );
- }
-
- if ( $update instanceof DataUpdate ) {
- $update->setTransactionTicket( $ticket );
- }
-
- // Designate $update::doUpdate() as the write round owner
- $fnameTrxOwner = ( $update instanceof DeferrableCallback )
- ? $update->getOrigin()
- : get_class( $update ) . '::doUpdate';
- // Determine whether the write round will be explicit or implicit
- $useExplicitTrxRound = !(
- $update instanceof TransactionRoundAwareUpdate &&
- $update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT
- );
+ public static function attemptUpdate( DeferrableUpdate $update ) {
+ self::getScopeStack()->onRunUpdateStart( $update );
- // Flush any pending changes left over from an implicit transaction round
- if ( $useExplicitTrxRound ) {
- $lbFactory->beginPrimaryChanges( $fnameTrxOwner ); // new explicit round
- } else {
- $lbFactory->commitPrimaryChanges( $fnameTrxOwner ); // new implicit round
- }
- // Run the update after any stale primary DB view snapshots have been flushed
$update->doUpdate();
- // Commit any pending changes from the explicit or implicit transaction round
- $lbFactory->commitPrimaryChanges( $fnameTrxOwner );
- }
+ self::getScopeStack()->onRunUpdateEnd( $update );
+ }
}
diff --git a/includes/deferred/DeferredUpdatesScopeMediaWikiStack.php b/includes/deferred/DeferredUpdatesScopeMediaWikiStack.php
new file mode 100644
index 000000000000..add54ff337a7
--- /dev/null
+++ b/includes/deferred/DeferredUpdatesScopeMediaWikiStack.php
@@ -0,0 +1,133 @@
+<?php
+/**
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ */
+
+use MediaWiki\MediaWikiServices;
+use Wikimedia\Rdbms\DBTransactionError;
+
+/**
+ * This class decouples DeferredUpdates's awareness of MediaWikiServices to ease unit testing.
+ *
+ * NOTE: As a process-level utility, it is important that MediaWikiServices::getInstance() is
+ * referenced explicitly each time, so as to not cache potentially stale references.
+ * For example after the Installer, or MediaWikiIntegrationTestCase, replace the service container.
+ *
+ * @internal For use by DeferredUpdates only
+ * @since 1.41
+ */
+class DeferredUpdatesScopeMediaWikiStack extends DeferredUpdatesScopeStack {
+
+ private function areDatabaseTransactionsActive(): bool {
+ $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+ if ( $lbFactory->hasTransactionRound()
+ || !$lbFactory->isReadyForRoundOperations()
+ ) {
+ return true;
+ }
+
+ foreach ( $lbFactory->getAllLBs() as $lb ) {
+ if ( $lb->hasPrimaryChanges() || $lb->explicitTrxActive() ) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ public function allowOpportunisticUpdates(): bool {
+ global $wgCommandLineMode;
+
+ if ( !$wgCommandLineMode ) {
+ // In web req
+ return false;
+ }
+
+ // Run the updates only if they will have outer transaction scope
+ if ( $this->areDatabaseTransactionsActive() ) {
+ // transaction round is active or connection is not ready for commit()
+ return false;
+ }
+
+ return true;
+ }
+
+ public function queueDataUpdate( EnqueueableDataUpdate $update ): void {
+ $spec = $update->getAsJobSpecification();
+ $jobQueueGroupFactory = MediaWikiServices::getInstance()->getJobQueueGroupFactory();
+ $jobQueueGroupFactory->makeJobQueueGroup( $spec['domain'] )->push( $spec['job'] );
+ }
+
+ public function onRunUpdateStart( DeferrableUpdate $update ): void {
+ global $wgCommandLineMode;
+
+ // Increment a counter metric
+ $type = get_class( $update )
+ . ( $update instanceof DeferrableCallback ? '_' . $update->getOrigin() : '' );
+ $httpMethod = $wgCommandLineMode ? 'cli' : strtolower( $_SERVER['REQUEST_METHOD'] ?? 'GET' );
+ $stats = MediaWikiServices::getInstance()->getStatsdDataFactory();
+ $stats->increment( "deferred_updates.$httpMethod.$type" );
+
+ $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+ $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
+ if ( !$ticket || $lbFactory->hasTransactionRound() ) {
+ throw new DBTransactionError( null, "A database transaction round is pending." );
+ }
+
+ if ( $update instanceof DataUpdate ) {
+ $update->setTransactionTicket( $ticket );
+ }
+
+ // Designate $update::doUpdate() as the write round owner
+ $fnameTrxOwner = ( $update instanceof DeferrableCallback )
+ ? $update->getOrigin()
+ : get_class( $update ) . '::doUpdate';
+
+ // Determine whether the write round will be explicit or implicit
+ $useExplicitTrxRound = !(
+ $update instanceof TransactionRoundAwareUpdate &&
+ $update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT
+ );
+
+ // Ensure any stale repeatable-read snapshot on the primary DB have been flushed
+ // before running the update. E.g. left-over from an implicit transaction round
+ if ( $useExplicitTrxRound ) {
+ // new explicit round
+ $lbFactory->beginPrimaryChanges( $fnameTrxOwner );
+ } else {
+ // new implicit round
+ $lbFactory->commitPrimaryChanges( $fnameTrxOwner );
+ }
+ }
+
+ public function onRunUpdateEnd( DeferrableUpdate $update ): void {
+ $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+
+ $fnameTrxOwner = ( $update instanceof DeferrableCallback )
+ ? $update->getOrigin()
+ : get_class( $update ) . '::doUpdate';
+
+ // Commit any pending changes from the explicit or implicit transaction round
+ $lbFactory->commitPrimaryChanges( $fnameTrxOwner );
+ }
+
+ public function onRunUpdateFailed( DeferrableUpdate $update ): void {
+ $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+ $lbFactory->rollbackPrimaryChanges( __METHOD__ );
+ }
+}
diff --git a/includes/deferred/DeferredUpdatesScopeStack.php b/includes/deferred/DeferredUpdatesScopeStack.php
index 043faee4e8b8..3962a4048a89 100644
--- a/includes/deferred/DeferredUpdatesScopeStack.php
+++ b/includes/deferred/DeferredUpdatesScopeStack.php
@@ -75,4 +75,48 @@ class DeferredUpdatesScopeStack {
public function getRecursiveDepth() {
return count( $this->stack ) - 1;
}
+
+ /**
+ * Whether DeferredUpdates::addUpdate() may run the update right away
+ *
+ * @return bool
+ */
+ public function allowOpportunisticUpdates(): bool {
+ // Overridden in DeferredUpdatesScopeMediaWikiStack::allowOpportunisticUpdates
+ return false;
+ }
+
+ /**
+ * Queue an EnqueueableDataUpdate as a job instead
+ *
+ * @see JobQueueGroup::push
+ * @param EnqueueableDataUpdate $update
+ */
+ public function queueDataUpdate( EnqueueableDataUpdate $update ): void {
+ throw new LogicException( 'Cannot queue jobs from DeferredUpdates in standalone mode' );
+ }
+
+ /**
+ * @param DeferrableUpdate $update
+ */
+ public function onRunUpdateStart( DeferrableUpdate $update ): void {
+ // No-op
+ // Overridden in DeferredUpdatesScopeMediaWikiStack::onRunUpdateStart
+ }
+
+ /**
+ * @param DeferrableUpdate $update
+ */
+ public function onRunUpdateEnd( DeferrableUpdate $update ): void {
+ // No-op
+ // Overridden in DeferredUpdatesScopeMediaWikiStack::onRunUpdateEnd
+ }
+
+ /**
+ * @param DeferrableUpdate $update
+ */
+ public function onRunUpdateFailed( DeferrableUpdate $update ): void {
+ // No-op
+ // Overridden in DeferredUpdatesScopeMediaWikiStack::onRunUpdateFailed
+ }
}
diff --git a/includes/deferred/RefreshSecondaryDataUpdate.php b/includes/deferred/RefreshSecondaryDataUpdate.php
index 3858b3532c7c..a32b5f47bff5 100644
--- a/includes/deferred/RefreshSecondaryDataUpdate.php
+++ b/includes/deferred/RefreshSecondaryDataUpdate.php
@@ -100,7 +100,7 @@ class RefreshSecondaryDataUpdate extends DataUpdate
$e = null;
foreach ( $updates as $update ) {
try {
- DeferredUpdates::attemptUpdate( $update, $this->lbFactory );
+ DeferredUpdates::attemptUpdate( $update );
} catch ( Exception $e ) {
// Try as many updates as possible on the first pass
MWExceptionHandler::rollbackPrimaryChangesAndLog( $e );
diff --git a/tests/phpunit/MediaWikiUnitTestCase.php b/tests/phpunit/MediaWikiUnitTestCase.php
index e4f82990e140..0b4aaf3c1465 100644
--- a/tests/phpunit/MediaWikiUnitTestCase.php
+++ b/tests/phpunit/MediaWikiUnitTestCase.php
@@ -103,6 +103,8 @@ abstract class MediaWikiUnitTestCase extends TestCase {
$GLOBALS[ $key ] = $value;
}
+ // Set DeferredUpdates into standalone mode
+ DeferredUpdates::setScopeStack( new DeferredUpdatesScopeStack() );
MediaWikiServices::disallowGlobalInstanceInUnitTests();
}
@@ -160,6 +162,7 @@ abstract class MediaWikiUnitTestCase extends TestCase {
unset( $value );
MediaWikiServices::allowGlobalInstanceAfterUnitTests();
+ DeferredUpdates::setScopeStack( new DeferredUpdatesScopeMediaWikiStack() );
}
}
diff --git a/tests/phpunit/includes/deferred/DeferredUpdatesTest.php b/tests/phpunit/includes/deferred/DeferredUpdatesTest.php
index 1ed7bdb8c2b8..d53cd1d7af9d 100644
--- a/tests/phpunit/includes/deferred/DeferredUpdatesTest.php
+++ b/tests/phpunit/includes/deferred/DeferredUpdatesTest.php
@@ -356,8 +356,7 @@ class DeferredUpdatesTest extends MediaWikiIntegrationTestCase {
$called = true;
},
$fname
- ),
- $lbFactory
+ )
);
$this->assertTrue( $called, "Callback ran" );