current()->addUpdate( $update, $stage ); // If CLI mode is active and no RDBMs transaction round is in the way, then run all // the pending updates now. This is needed for scripts that never, or rarely, use the // RDBMs layer, but that do modify systems via deferred updates. This logic avoids // excessive pending update queue sizes when long-running scripts never trigger the // basic RDBMs hooks for running pending updates. if ( $wgCommandLineMode ) { self::tryOpportunisticExecute( 'run' ); } } /** * Add an update to the pending update queue that invokes the specified callback when run * * @see DeferredUpdates::addUpdate() * @see MWCallableUpdate::__construct() * * @param callable $callable * @param int $stage One of (DeferredUpdates::PRESEND, DeferredUpdates::POSTSEND) * @param IDatabase|IDatabase[]|null $dbw Abort if this DB is rolled back [optional] * @since 1.27 Added $stage parameter * @since 1.28 Added the $dbw parameter */ public static function addCallableUpdate( $callable, $stage = self::POSTSEND, $dbw = null ) { self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage ); } /** * Consume and execute all pending updates * * Note that it is rarely the case that this method should be called outside of a few * select entry points. For simplicity, that kind of recursion is discouraged. Recursion * cannot happen if an explicit transaction round is active, which limits usage to updates * with TRX_ROUND_ABSENT that do not leave open an transactions round of their own during * the call to this method. * * In the less-common case of this being called within an in-progress DeferrableUpdate, * this will not see any top-queue updates (since they were consumed and are being run * inside an outer execution loop). In that case, it will instead operate on the sub-queue * of the innermost in-progress update on the stack. * * The $mode parameter determines how the updates are processed. Use "run" to process the * updates by running them. Otherwise, use "enqueue" to process the updates by converting * the EnqueueableDataUpdate instances to jobs and running the others. * * @param string $mode Either "run" or "enqueue" [default: "run"] * @param int $stage Which updates to process. One of * (DeferredUpdates::PRESEND, DeferredUpdates::POSTSEND, DeferredUpdates::ALL) * @internal For use by MediaWiki, Maintenance, JobRunner, JobExecutor * @since 1.27 Added $stage parameter */ public static function doUpdates( $mode = 'run', $stage = self::ALL ) { $services = MediaWikiServices::getInstance(); $stats = $services->getStatsdDataFactory(); $lbf = $services->getDBLoadBalancerFactory(); $logger = LoggerFactory::getInstance( 'DeferredUpdates' ); $httpMethod = $services->getMainConfig()->get( 'CommandLineMode' ) ? 'cli' : strtolower( RequestContext::getMain()->getRequest()->getMethod() ); /** @var ErrorPageError $guiError First presentable client-level error thrown */ $guiError = null; /** @var Throwable $exception First of any error thrown */ $exception = null; $scope = self::getScopeStack()->current(); // T249069: recursion is not possible once explicit transaction rounds are involved $activeUpdate = $scope->getActiveUpdate(); if ( $activeUpdate ) { $class = get_class( $activeUpdate ); if ( !( $activeUpdate instanceof TransactionRoundAwareUpdate ) ) { throw new LogicException( __METHOD__ . ": reached from $class, which is not TransactionRoundAwareUpdate" ); } if ( $activeUpdate->getTransactionRoundRequirement() !== $activeUpdate::TRX_ROUND_ABSENT ) { throw new LogicException( __METHOD__ . ": reached from $class, which does not specify TRX_ROUND_ABSENT" ); } } $scope->processUpdates( $stage, function ( DeferrableUpdate $update, $activeStage ) use ( $mode, $lbf, $logger, $stats, $httpMethod, &$guiError, &$exception ) { // If applicable, just enqueue the update as a job in the job queue system if ( $mode === 'enqueue' && $update instanceof EnqueueableDataUpdate ) { self::jobify( $update, $lbf, $logger, $stats, $httpMethod ); return; } // Otherwise, run the update.... $scopeStack = self::getScopeStack(); $childScope = $scopeStack->descend( $activeStage, $update ); try { $e = self::run( $update, $lbf, $logger, $stats, $httpMethod ); $guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null ); $exception = $exception ?: $e; // Any addUpdate() calls between descend() and ascend() used the sub-queue. // In rare cases, DeferrableUpdate::doUpdates() will process them by calling // doUpdates() itself. In any case, process remaining updates in the subqueue. // them, enqueueing them, or transferring them to the parent scope // queues as appropriate... $childScope->processUpdates( $activeStage, function ( DeferrableUpdate $subUpdate ) use ( $lbf, $logger, $stats, $httpMethod, &$guiError, &$exception ) { $e = self::run( $subUpdate, $lbf, $logger, $stats, $httpMethod ); $guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null ); $exception = $exception ?: $e; } ); } finally { $scopeStack->ascend(); } } ); // VW-style hack to work around T190178, so we can make sure // PageMetaDataUpdater doesn't throw exceptions. if ( $exception && defined( 'MW_PHPUNIT_TEST' ) ) { throw $exception; } // Throw the first of any GUI errors as long as the context is HTTP pre-send. However, // callers should check permissions *before* enqueueing updates. If the main transaction // round actions succeed but some deferred updates fail due to permissions errors then // there is a risk that some secondary data was not properly updated. if ( $guiError && $stage === self::PRESEND && !headers_sent() ) { throw $guiError; } } /** * Consume and execute all pending updates unless an update is already * in progress or the LBFactory service instance has "busy" DB handles * * A DB handle is considered "busy" if it has an unfinished transaction that cannot safely * be flushed or the parent LBFactory instance has an unfinished transaction round that * cannot safely be flushed. If the number of pending updates reaches BIG_QUEUE_SIZE and * there are still busy DB handles, then EnqueueableDataUpdate updates might be enqueued * as jobs. This avoids excessive memory use and risk of losing updates due to failures. * * The $mode parameter determines how the updates are processed. Use "run" to process the * updates by running them. Otherwise, use "enqueue" to process the updates by converting * the EnqueueableDataUpdate instances to jobs and running the others. * * Note that this method operates on updates from all stages and thus should not be called * during web requests. It is only intended for long-running Maintenance scripts. * * @param string $mode Either "run" or "enqueue" [default: "run"] * @return bool Whether updates were allowed to run * @internal For use by Maintenance * @since 1.28 */ public static function tryOpportunisticExecute( $mode = 'run' ) { // Leave execution up to the current loop if an update is already in progress if ( self::getRecursiveExecutionStackDepth() ) { return false; } // Run the updates for this context if they will have outer transaction scope if ( !self::areDatabaseTransactionsActive() ) { self::doUpdates( $mode, self::ALL ); return true; } if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) { // There are a large number of pending updates and none of them can run yet. // The odds of losing updates due to an error increase when executing long queues // and when large amounts of time pass while tasks are queued. Mitigate this by // trying to migrate updates to the job queue system (where applicable). self::getScopeStack()->current()->consumeMatchingUpdates( self::ALL, EnqueueableDataUpdate::class, static function ( EnqueueableDataUpdate $update ) { $spec = $update->getAsJobSpecification(); JobQueueGroup::singleton( $spec['domain'] )->push( $spec['job'] ); } ); } return false; } /** * Get the number of pending updates for the current execution context * * If an update is in progress, then this operates on the sub-queues of the * innermost in-progress update. Otherwise, it acts on the top-queues. * * @return int * @since 1.28 */ public static function pendingUpdatesCount() { return self::getScopeStack()->current()->pendingUpdatesCount(); } /** * Get a list of the pending updates for the current execution context * * If an update is in progress, then this operates on the sub-queues of the * innermost in-progress update. Otherwise, it acts on the top-queues. * * @param int $stage Look for updates with this "defer until" stage. One of * (DeferredUpdates::PRESEND, DeferredUpdates::POSTSEND, DeferredUpdates::ALL) * @return DeferrableUpdate[] * @internal This method should only be used for unit tests * @since 1.29 */ public static function getPendingUpdates( $stage = self::ALL ) { return self::getScopeStack()->current()->getPendingUpdates( $stage ); } /** * Cancel all pending updates for the current execution context * * If an update is in progress, then this operates on the sub-queues of the * innermost in-progress update. Otherwise, it acts on the top-queues. * * @internal This method should only be used for unit tests */ public static function clearPendingUpdates() { self::getScopeStack()->current()->clearPendingUpdates(); } /** * Get the number of in-progress calls to DeferredUpdates::doUpdates() * * @return int * @internal This method should only be used for unit tests */ public static function getRecursiveExecutionStackDepth() { return self::getScopeStack()->getRecursiveDepth(); } /** * Run an update, and, if an error was thrown, catch/log it and enqueue the update as * a job in the job queue system if possible (e.g. implements EnqueueableDataUpdate) * * @param DeferrableUpdate $update * @param ILBFactory $lbFactory * @param LoggerInterface $logger * @param StatsdDataFactoryInterface $stats * @param string $httpMethod * @return Throwable|null */ private static function run( DeferrableUpdate $update, ILBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod ) : ?Throwable { $suffix = ( $update instanceof DeferrableCallback ) ? "_{$update->getOrigin()}" : ''; $type = get_class( $update ) . $suffix; $stats->increment( "deferred_updates.$httpMethod.$type" ); $updateId = spl_object_id( $update ); $logger->debug( __METHOD__ . ": started $type #$updateId" ); $startTime = microtime( true ); $e = null; try { self::attemptUpdate( $update, $lbFactory ); return null; } catch ( Throwable $e ) { } finally { $executionTime = microtime( true ) - $startTime; $logger->debug( __METHOD__ . ": ended $type #$updateId, processing time: $executionTime" ); } MWExceptionHandler::logException( $e ); $logger->error( "Deferred update '{deferred_type}' failed to run.", [ 'deferred_type' => $type, 'exception' => $e, ] ); $lbFactory->rollbackMasterChanges( __METHOD__ ); // Try to push the update as a job so it can run later if possible if ( $update instanceof EnqueueableDataUpdate ) { $jobEx = null; try { $spec = $update->getAsJobSpecification(); JobQueueGroup::singleton( $spec['domain'] )->push( $spec['job'] ); return $e; } catch ( Throwable $jobEx ) { } MWExceptionHandler::logException( $jobEx ); $logger->error( "Deferred update '{deferred_type}' failed to enqueue as a job.", [ 'deferred_type' => $type, 'exception' => $jobEx, ] ); $lbFactory->rollbackMasterChanges( __METHOD__ ); } return $e; } /** * Enqueue an update as a job in the job queue system and catch/log any exceptions * * @param EnqueueableDataUpdate $update * @param LBFactory $lbFactory * @param LoggerInterface $logger * @param StatsdDataFactoryInterface $stats * @param string $httpMethod */ private static function jobify( EnqueueableDataUpdate $update, LBFactory $lbFactory, LoggerInterface $logger, StatsdDataFactoryInterface $stats, $httpMethod ) { $type = get_class( $update ); $stats->increment( "deferred_updates.$httpMethod.$type" ); $jobEx = null; try { $spec = $update->getAsJobSpecification(); JobQueueGroup::singleton( $spec['domain'] )->push( $spec['job'] ); return; } catch ( Throwable $jobEx ) { } MWExceptionHandler::logException( $jobEx ); $logger->error( "Deferred update '$type' failed to enqueue as a job.", [ 'deferred_type' => $type, 'exception' => $jobEx, ] ); $lbFactory->rollbackMasterChanges( __METHOD__ ); } /** * Attempt to run an update with the appropriate transaction round state it expects * * DeferredUpdate classes that wrap the execution of bundles of other DeferredUpdate * instances can use this method to run the updates. Any such wrapper class should * always use TRX_ROUND_ABSENT itself. * * @param DeferrableUpdate $update * @param ILBFactory $lbFactory * @since 1.34 */ public static function attemptUpdate( DeferrableUpdate $update, ILBFactory $lbFactory ) { $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ ); if ( !$ticket || $lbFactory->hasTransactionRound() ) { throw new DBTransactionError( null, "A database transaction round is pending." ); } if ( $update instanceof DataUpdate ) { $update->setTransactionTicket( $ticket ); } // Designate $update::doUpdate() as the write round owner $fnameTrxOwner = ( $update instanceof DeferrableCallback ) ? $update->getOrigin() : get_class( $update ) . '::doUpdate'; // Determine whether the write round will be explicit or implicit $useExplicitTrxRound = !( $update instanceof TransactionRoundAwareUpdate && $update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT ); // Flush any pending changes left over from an implicit transaction round if ( $useExplicitTrxRound ) { $lbFactory->beginMasterChanges( $fnameTrxOwner ); // new explicit round } else { $lbFactory->commitMasterChanges( $fnameTrxOwner ); // new implicit round } // Run the update after any stale master view snapshots have been flushed $update->doUpdate(); // Commit any pending changes from the explicit or implicit transaction round $lbFactory->commitMasterChanges( $fnameTrxOwner ); } /** * @return bool If a transaction round is active or connection is not ready for commit() */ private static function areDatabaseTransactionsActive() { $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); if ( $lbFactory->hasTransactionRound() || !$lbFactory->isReadyForRoundOperations() ) { return true; } $connsBusy = false; $lbFactory->forEachLB( static function ( LoadBalancer $lb ) use ( &$connsBusy ) { $lb->forEachOpenMasterConnection( static function ( IDatabase $conn ) use ( &$connsBusy ) { if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) { $connsBusy = true; } } ); } ); return $connsBusy; } /** * @return DeferredUpdatesScopeStack */ private static function getScopeStack() { if ( self::$scopeStack === null ) { self::$scopeStack = new DeferredUpdatesScopeStack(); } return self::$scopeStack; } }