diff options
author | Timo Tijhof <krinkle@fastmail.com> | 2023-08-25 00:38:02 +0100 |
---|---|---|
committer | Timo Tijhof <krinkle@fastmail.com> | 2023-09-09 20:42:02 +0100 |
commit | e56552557f0da1260ea567c3c936f5c7a1a3c760 (patch) | |
tree | a8b2149ace05f4c534ba0a053966d703eb92fc2e | |
parent | 47cdcbac8b3fbefb64b70319090c1add021d195b (diff) | |
download | mediawikicore-e56552557f0da1260ea567c3c936f5c7a1a3c760.tar.gz mediawikicore-e56552557f0da1260ea567c3c936f5c7a1a3c760.zip |
deferred: Decouple DeferredUpdates from MediaWikiServices
* Create task-specific methods with simple defaults that require no
mocking or stubbing of any kind, as used by the pure unit tests
where service container (and by extent, storage services) are
disabled.
* Remove all use of global variables, LBFactory, JobQueue,
StatsdFactory, and RequestContext.
Bug: T265749
Change-Id: If85c448d2d1b806e70f641f06263680d49c6eeec
-rw-r--r-- | autoload.php | 1 | ||||
-rw-r--r-- | includes/Storage/DerivedPageDataUpdater.php | 2 | ||||
-rw-r--r-- | includes/deferred/DeferredUpdates.php | 139 | ||||
-rw-r--r-- | includes/deferred/DeferredUpdatesScopeMediaWikiStack.php | 133 | ||||
-rw-r--r-- | includes/deferred/DeferredUpdatesScopeStack.php | 44 | ||||
-rw-r--r-- | includes/deferred/RefreshSecondaryDataUpdate.php | 2 | ||||
-rw-r--r-- | tests/phpunit/MediaWikiUnitTestCase.php | 3 | ||||
-rw-r--r-- | tests/phpunit/includes/deferred/DeferredUpdatesTest.php | 3 |
8 files changed, 220 insertions, 107 deletions
diff --git a/autoload.php b/autoload.php index e75ebf2ca7af..8e266977c7f3 100644 --- a/autoload.php +++ b/autoload.php @@ -344,6 +344,7 @@ $wgAutoloadLocalClasses = [ 'DeferrableUpdate' => __DIR__ . '/includes/deferred/DeferrableUpdate.php', 'DeferredUpdates' => __DIR__ . '/includes/deferred/DeferredUpdates.php', 'DeferredUpdatesScope' => __DIR__ . '/includes/deferred/DeferredUpdatesScope.php', + 'DeferredUpdatesScopeMediaWikiStack' => __DIR__ . '/includes/deferred/DeferredUpdatesScopeMediaWikiStack.php', 'DeferredUpdatesScopeStack' => __DIR__ . '/includes/deferred/DeferredUpdatesScopeStack.php', 'Deflate' => __DIR__ . '/includes/libs/Deflate.php', 'DeleteAction' => __DIR__ . '/includes/actions/DeleteAction.php', diff --git a/includes/Storage/DerivedPageDataUpdater.php b/includes/Storage/DerivedPageDataUpdater.php index a8aa749f0692..9a44c7b0ce87 100644 --- a/includes/Storage/DerivedPageDataUpdater.php +++ b/includes/Storage/DerivedPageDataUpdater.php @@ -1827,7 +1827,7 @@ class DerivedPageDataUpdater implements IDBAccessObject, LoggerAwareInterface, P $update->setCause( $causeAction, $causeAgent ); if ( $options['defer'] === false ) { - DeferredUpdates::attemptUpdate( $update, $this->loadbalancerFactory ); + DeferredUpdates::attemptUpdate( $update ); } else { DeferredUpdates::addUpdate( $update, $options['defer'] ); } diff --git a/includes/deferred/DeferredUpdates.php b/includes/deferred/DeferredUpdates.php index 2d8ef23915eb..b4a1b6dc8350 100644 --- a/includes/deferred/DeferredUpdates.php +++ b/includes/deferred/DeferredUpdates.php @@ -19,10 +19,7 @@ */ use MediaWiki\Logger\LoggerFactory; -use MediaWiki\MediaWikiServices; -use Wikimedia\Rdbms\DBTransactionError; use Wikimedia\Rdbms\IDatabase; -use Wikimedia\Rdbms\ILBFactory; use Wikimedia\ScopedCallback; /** @@ -119,14 +116,22 @@ class DeferredUpdates { * @return DeferredUpdatesScopeStack */ private static function getScopeStack(): DeferredUpdatesScopeStack { - if ( self::$scopeStack === null ) { - self::$scopeStack = new DeferredUpdatesScopeStack(); - } - + self::$scopeStack ??= new DeferredUpdatesScopeMediaWikiStack(); return self::$scopeStack; } /** + * @param DeferredUpdatesScopeStack $scopeStack + * @internal Only for use in tests. + */ + public static function setScopeStack( DeferredUpdatesScopeStack $scopeStack ): void { + if ( !defined( 'MW_PHPUNIT_TEST' ) ) { + throw new LogicException( 'Cannot reconfigure DeferredUpdates outside tests' ); + } + self::$scopeStack = $scopeStack; + } + + /** * Add an update to the pending update queue for execution at the appropriate time * * In CLI mode, callback magic will also be used to run updates when safe @@ -147,12 +152,8 @@ class DeferredUpdates { * @since 1.28 Added the $stage parameter */ public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) { - global $wgCommandLineMode; - self::getScopeStack()->current()->addUpdate( $update, $stage ); - if ( $wgCommandLineMode ) { - self::tryOpportunisticExecute(); - } + self::tryOpportunisticExecute(); } /** @@ -174,24 +175,13 @@ class DeferredUpdates { * a job in the job queue system if possible (e.g. implements EnqueueableDataUpdate) * * @param DeferrableUpdate $update - * @param string $httpMethod * @return Throwable|null */ - private static function run( - DeferrableUpdate $update, - $httpMethod - ): ?Throwable { + private static function run( DeferrableUpdate $update ): ?Throwable { $logger = LoggerFactory::getInstance( 'DeferredUpdates' ); - $services = MediaWikiServices::getInstance(); - $stats = $services->getStatsdDataFactory(); - $lbFactory = $services->getDBLoadBalancerFactory(); - $jobQueueGroupFactory = $services->getJobQueueGroupFactory(); - - $suffix = $update instanceof DeferrableCallback ? '_' . $update->getOrigin() : ''; - $type = get_class( $update ) . $suffix; - $stats->increment( "deferred_updates.$httpMethod.$type" ); - + $type = get_class( $update ) + . ( $update instanceof DeferrableCallback ? '_' . $update->getOrigin() : '' ); $updateId = spl_object_id( $update ); $logger->debug( __METHOD__ . ": started $type #$updateId" ); @@ -199,7 +189,7 @@ class DeferredUpdates { $startTime = microtime( true ); try { - self::attemptUpdate( $update, $lbFactory ); + self::attemptUpdate( $update ); } catch ( Throwable $updateException ) { MWExceptionHandler::logException( $updateException ); $logger->error( @@ -209,7 +199,7 @@ class DeferredUpdates { 'exception' => $updateException, ] ); - $lbFactory->rollbackPrimaryChanges( __METHOD__ ); + self::getScopeStack()->onRunUpdateFailed( $update ); } finally { $walltime = microtime( true ) - $startTime; $logger->debug( __METHOD__ . ": ended $type #$updateId, processing time: $walltime" ); @@ -218,8 +208,7 @@ class DeferredUpdates { // Try to push the update as a job so it can run later if possible if ( $updateException && $update instanceof EnqueueableDataUpdate ) { try { - $spec = $update->getAsJobSpecification(); - $jobQueueGroupFactory->makeJobQueueGroup( $spec['domain'] )->push( $spec['job'] ); + self::getScopeStack()->queueDataUpdate( $update ); } catch ( Throwable $jobException ) { MWExceptionHandler::logException( $jobException ); $logger->error( @@ -229,7 +218,7 @@ class DeferredUpdates { 'exception' => $jobException, ] ); - $lbFactory->rollbackPrimaryChanges( __METHOD__ ); + self::getScopeStack()->onRunUpdateFailed( $update ); } } @@ -255,12 +244,6 @@ class DeferredUpdates { * (DeferredUpdates::PRESEND, DeferredUpdates::POSTSEND, DeferredUpdates::ALL) */ public static function doUpdates( $stage = self::ALL ) { - global $wgCommandLineMode; - - $httpMethod = $wgCommandLineMode - ? 'cli' - : strtolower( RequestContext::getMain()->getRequest()->getMethod() ); - /** @var ErrorPageError $guiError First presentable client-level error thrown */ $guiError = null; /** @var Throwable $exception First of any error thrown */ @@ -286,13 +269,11 @@ class DeferredUpdates { $scope->processUpdates( $stage, - static function ( DeferrableUpdate $update, $activeStage ) - use ( $httpMethod, &$guiError, &$exception ) - { + static function ( DeferrableUpdate $update, $activeStage ) use ( &$guiError, &$exception ) { $scopeStack = self::getScopeStack(); $childScope = $scopeStack->descend( $activeStage, $update ); try { - $e = self::run( $update, $httpMethod ); + $e = self::run( $update ); $guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null ); $exception = $exception ?: $e; // Any addUpdate() calls between descend() and ascend() used the sub-queue. @@ -302,10 +283,8 @@ class DeferredUpdates { // queues as appropriate... $childScope->processUpdates( $activeStage, - static function ( DeferrableUpdate $subUpdate ) - use ( $httpMethod, &$guiError, &$exception ) - { - $e = self::run( $subUpdate, $httpMethod ); + static function ( DeferrableUpdate $sub ) use ( &$guiError, &$exception ) { + $e = self::run( $sub ); $guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null ); $exception = $exception ?: $e; } @@ -332,26 +311,6 @@ class DeferredUpdates { } /** - * @return bool If a transaction round is active or connection is not ready for commit() - */ - private static function areDatabaseTransactionsActive() { - $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); - if ( $lbFactory->hasTransactionRound() - || !$lbFactory->isReadyForRoundOperations() - ) { - return true; - } - - foreach ( $lbFactory->getAllLBs() as $lb ) { - if ( $lb->hasPrimaryChanges() || $lb->explicitTrxActive() ) { - return true; - } - } - - return false; - } - - /** * Consume and execute pending updates now if possible, instead of waiting. * * In web requests, updates are always deferred until the end of the request. @@ -396,25 +355,25 @@ class DeferredUpdates { return false; } - // Run the updates for this context if they will have outer transaction scope - if ( !self::areDatabaseTransactionsActive() ) { + if ( self::getScopeStack()->allowOpportunisticUpdates() ) { self::doUpdates( self::ALL ); - return true; } if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) { // There are a large number of pending updates and none of them can run yet. - // The odds of losing updates due to an error increase when executing long queues + // The odds of losing updates due to an error increases when executing long queues // and when large amounts of time pass while tasks are queued. Mitigate this by - // trying to migrate updates to the job queue system (where applicable). + // trying to eagerly move updates to the JobQueue when possible. + // + // TODO: Do we still need this now maintenance scripts automatically call + // tryOpportunisticExecute from addUpdate, from every commit, and every + // waitForReplication call? self::getScopeStack()->current()->consumeMatchingUpdates( self::ALL, EnqueueableDataUpdate::class, static function ( EnqueueableDataUpdate $update ) { - $spec = $update->getAsJobSpecification(); - $jobQueueGroupFactory = MediaWikiServices::getInstance()->getJobQueueGroupFactory(); - $jobQueueGroupFactory->makeJobQueueGroup( $spec['domain'] )->push( $spec['job'] ); + self::getScopeStack()->queueDataUpdate( $update ); } ); } @@ -496,39 +455,13 @@ class DeferredUpdates { * transaction round. * * @param DeferrableUpdate $update - * @param ILBFactory $lbFactory * @since 1.34 */ - public static function attemptUpdate( DeferrableUpdate $update, ILBFactory $lbFactory ) { - $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ ); - if ( !$ticket || $lbFactory->hasTransactionRound() ) { - throw new DBTransactionError( null, "A database transaction round is pending." ); - } - - if ( $update instanceof DataUpdate ) { - $update->setTransactionTicket( $ticket ); - } - - // Designate $update::doUpdate() as the write round owner - $fnameTrxOwner = ( $update instanceof DeferrableCallback ) - ? $update->getOrigin() - : get_class( $update ) . '::doUpdate'; - // Determine whether the write round will be explicit or implicit - $useExplicitTrxRound = !( - $update instanceof TransactionRoundAwareUpdate && - $update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT - ); + public static function attemptUpdate( DeferrableUpdate $update ) { + self::getScopeStack()->onRunUpdateStart( $update ); - // Flush any pending changes left over from an implicit transaction round - if ( $useExplicitTrxRound ) { - $lbFactory->beginPrimaryChanges( $fnameTrxOwner ); // new explicit round - } else { - $lbFactory->commitPrimaryChanges( $fnameTrxOwner ); // new implicit round - } - // Run the update after any stale primary DB view snapshots have been flushed $update->doUpdate(); - // Commit any pending changes from the explicit or implicit transaction round - $lbFactory->commitPrimaryChanges( $fnameTrxOwner ); - } + self::getScopeStack()->onRunUpdateEnd( $update ); + } } diff --git a/includes/deferred/DeferredUpdatesScopeMediaWikiStack.php b/includes/deferred/DeferredUpdatesScopeMediaWikiStack.php new file mode 100644 index 000000000000..add54ff337a7 --- /dev/null +++ b/includes/deferred/DeferredUpdatesScopeMediaWikiStack.php @@ -0,0 +1,133 @@ +<?php +/** + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + */ + +use MediaWiki\MediaWikiServices; +use Wikimedia\Rdbms\DBTransactionError; + +/** + * This class decouples DeferredUpdates's awareness of MediaWikiServices to ease unit testing. + * + * NOTE: As a process-level utility, it is important that MediaWikiServices::getInstance() is + * referenced explicitly each time, so as to not cache potentially stale references. + * For example after the Installer, or MediaWikiIntegrationTestCase, replace the service container. + * + * @internal For use by DeferredUpdates only + * @since 1.41 + */ +class DeferredUpdatesScopeMediaWikiStack extends DeferredUpdatesScopeStack { + + private function areDatabaseTransactionsActive(): bool { + $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); + if ( $lbFactory->hasTransactionRound() + || !$lbFactory->isReadyForRoundOperations() + ) { + return true; + } + + foreach ( $lbFactory->getAllLBs() as $lb ) { + if ( $lb->hasPrimaryChanges() || $lb->explicitTrxActive() ) { + return true; + } + } + + return false; + } + + public function allowOpportunisticUpdates(): bool { + global $wgCommandLineMode; + + if ( !$wgCommandLineMode ) { + // In web req + return false; + } + + // Run the updates only if they will have outer transaction scope + if ( $this->areDatabaseTransactionsActive() ) { + // transaction round is active or connection is not ready for commit() + return false; + } + + return true; + } + + public function queueDataUpdate( EnqueueableDataUpdate $update ): void { + $spec = $update->getAsJobSpecification(); + $jobQueueGroupFactory = MediaWikiServices::getInstance()->getJobQueueGroupFactory(); + $jobQueueGroupFactory->makeJobQueueGroup( $spec['domain'] )->push( $spec['job'] ); + } + + public function onRunUpdateStart( DeferrableUpdate $update ): void { + global $wgCommandLineMode; + + // Increment a counter metric + $type = get_class( $update ) + . ( $update instanceof DeferrableCallback ? '_' . $update->getOrigin() : '' ); + $httpMethod = $wgCommandLineMode ? 'cli' : strtolower( $_SERVER['REQUEST_METHOD'] ?? 'GET' ); + $stats = MediaWikiServices::getInstance()->getStatsdDataFactory(); + $stats->increment( "deferred_updates.$httpMethod.$type" ); + + $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); + $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ ); + if ( !$ticket || $lbFactory->hasTransactionRound() ) { + throw new DBTransactionError( null, "A database transaction round is pending." ); + } + + if ( $update instanceof DataUpdate ) { + $update->setTransactionTicket( $ticket ); + } + + // Designate $update::doUpdate() as the write round owner + $fnameTrxOwner = ( $update instanceof DeferrableCallback ) + ? $update->getOrigin() + : get_class( $update ) . '::doUpdate'; + + // Determine whether the write round will be explicit or implicit + $useExplicitTrxRound = !( + $update instanceof TransactionRoundAwareUpdate && + $update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT + ); + + // Ensure any stale repeatable-read snapshot on the primary DB have been flushed + // before running the update. E.g. left-over from an implicit transaction round + if ( $useExplicitTrxRound ) { + // new explicit round + $lbFactory->beginPrimaryChanges( $fnameTrxOwner ); + } else { + // new implicit round + $lbFactory->commitPrimaryChanges( $fnameTrxOwner ); + } + } + + public function onRunUpdateEnd( DeferrableUpdate $update ): void { + $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); + + $fnameTrxOwner = ( $update instanceof DeferrableCallback ) + ? $update->getOrigin() + : get_class( $update ) . '::doUpdate'; + + // Commit any pending changes from the explicit or implicit transaction round + $lbFactory->commitPrimaryChanges( $fnameTrxOwner ); + } + + public function onRunUpdateFailed( DeferrableUpdate $update ): void { + $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); + $lbFactory->rollbackPrimaryChanges( __METHOD__ ); + } +} diff --git a/includes/deferred/DeferredUpdatesScopeStack.php b/includes/deferred/DeferredUpdatesScopeStack.php index 043faee4e8b8..3962a4048a89 100644 --- a/includes/deferred/DeferredUpdatesScopeStack.php +++ b/includes/deferred/DeferredUpdatesScopeStack.php @@ -75,4 +75,48 @@ class DeferredUpdatesScopeStack { public function getRecursiveDepth() { return count( $this->stack ) - 1; } + + /** + * Whether DeferredUpdates::addUpdate() may run the update right away + * + * @return bool + */ + public function allowOpportunisticUpdates(): bool { + // Overridden in DeferredUpdatesScopeMediaWikiStack::allowOpportunisticUpdates + return false; + } + + /** + * Queue an EnqueueableDataUpdate as a job instead + * + * @see JobQueueGroup::push + * @param EnqueueableDataUpdate $update + */ + public function queueDataUpdate( EnqueueableDataUpdate $update ): void { + throw new LogicException( 'Cannot queue jobs from DeferredUpdates in standalone mode' ); + } + + /** + * @param DeferrableUpdate $update + */ + public function onRunUpdateStart( DeferrableUpdate $update ): void { + // No-op + // Overridden in DeferredUpdatesScopeMediaWikiStack::onRunUpdateStart + } + + /** + * @param DeferrableUpdate $update + */ + public function onRunUpdateEnd( DeferrableUpdate $update ): void { + // No-op + // Overridden in DeferredUpdatesScopeMediaWikiStack::onRunUpdateEnd + } + + /** + * @param DeferrableUpdate $update + */ + public function onRunUpdateFailed( DeferrableUpdate $update ): void { + // No-op + // Overridden in DeferredUpdatesScopeMediaWikiStack::onRunUpdateFailed + } } diff --git a/includes/deferred/RefreshSecondaryDataUpdate.php b/includes/deferred/RefreshSecondaryDataUpdate.php index 3858b3532c7c..a32b5f47bff5 100644 --- a/includes/deferred/RefreshSecondaryDataUpdate.php +++ b/includes/deferred/RefreshSecondaryDataUpdate.php @@ -100,7 +100,7 @@ class RefreshSecondaryDataUpdate extends DataUpdate $e = null; foreach ( $updates as $update ) { try { - DeferredUpdates::attemptUpdate( $update, $this->lbFactory ); + DeferredUpdates::attemptUpdate( $update ); } catch ( Exception $e ) { // Try as many updates as possible on the first pass MWExceptionHandler::rollbackPrimaryChangesAndLog( $e ); diff --git a/tests/phpunit/MediaWikiUnitTestCase.php b/tests/phpunit/MediaWikiUnitTestCase.php index e4f82990e140..0b4aaf3c1465 100644 --- a/tests/phpunit/MediaWikiUnitTestCase.php +++ b/tests/phpunit/MediaWikiUnitTestCase.php @@ -103,6 +103,8 @@ abstract class MediaWikiUnitTestCase extends TestCase { $GLOBALS[ $key ] = $value; } + // Set DeferredUpdates into standalone mode + DeferredUpdates::setScopeStack( new DeferredUpdatesScopeStack() ); MediaWikiServices::disallowGlobalInstanceInUnitTests(); } @@ -160,6 +162,7 @@ abstract class MediaWikiUnitTestCase extends TestCase { unset( $value ); MediaWikiServices::allowGlobalInstanceAfterUnitTests(); + DeferredUpdates::setScopeStack( new DeferredUpdatesScopeMediaWikiStack() ); } } diff --git a/tests/phpunit/includes/deferred/DeferredUpdatesTest.php b/tests/phpunit/includes/deferred/DeferredUpdatesTest.php index 1ed7bdb8c2b8..d53cd1d7af9d 100644 --- a/tests/phpunit/includes/deferred/DeferredUpdatesTest.php +++ b/tests/phpunit/includes/deferred/DeferredUpdatesTest.php @@ -356,8 +356,7 @@ class DeferredUpdatesTest extends MediaWikiIntegrationTestCase { $called = true; }, $fname - ), - $lbFactory + ) ); $this->assertTrue( $called, "Callback ran" ); |