Connection parameters used by initConnection() and open() */ protected $connectionParams; /** @var string[]|int[]|float[] SQL variables values to use for all new connections */ protected $connectionVariables; /** @var int Row batch size to use for emulated INSERT SELECT queries */ protected $nonNativeInsertSelectBatchSize; /** @var bool Whether to use SSL connections */ protected $ssl; /** @var bool Whether to check for warnings */ protected $strictWarnings; /** @var array Current LoadBalancer tracking information */ protected $lbInfo = []; /** @var string|false Current SQL query delimiter */ protected $delimiter = ';'; /** @var string|bool|null Stashed value of html_errors INI setting */ private $htmlErrors; /** @var array Map of (lock name => (UNIX time,trx ID)) */ protected $sessionNamedLocks = []; /** @var array> Map of (DB name => table name => info) */ protected $sessionTempTables = []; /** @var int Affected row count for the last statement to query() */ protected $lastQueryAffectedRows = 0; /** @var int|null Insert (row) ID for the last statement to query() (null if not supported) */ protected $lastQueryInsertId; /** @var int|null Affected row count for the last query method call; null if unspecified */ protected $lastEmulatedAffectedRows; /** @var int|null Insert (row) ID for the last query method call; null if unspecified */ protected $lastEmulatedInsertId; /** @var string Last error during connection; empty string if none */ protected $lastConnectError = ''; /** @var float UNIX timestamp of the last server response */ private $lastPing = 0.0; /** @var float|null UNIX timestamp of the last committed write */ private $lastWriteTime; /** @var string|false The last PHP error from a query or connection attempt */ private $lastPhpError = false; /** @var int|null Current critical section numeric ID */ private $csmId; /** @var string|null Last critical section caller name */ private $csmFname; /** @var DBUnexpectedError|null Last unresolved critical section error */ private $csmError; /** Whether the database is a file on disk */ public const ATTR_DB_IS_FILE = 'db-is-file'; /** Lock granularity is on the level of the entire database */ public const ATTR_DB_LEVEL_LOCKING = 'db-level-locking'; /** The SCHEMA keyword refers to a grouping of tables in a database */ public const ATTR_SCHEMAS_AS_TABLE_GROUPS = 'supports-schemas'; /** New Database instance will not be connected yet when returned */ public const NEW_UNCONNECTED = 0; /** New Database instance will already be connected when returned */ public const NEW_CONNECTED = 1; /** No errors occurred during the query */ protected const ERR_NONE = 0; /** Retry query due to a connection loss detected while sending the query (session intact) */ protected const ERR_RETRY_QUERY = 1; /** Abort query (no retries) due to a statement rollback (session/transaction intact) */ protected const ERR_ABORT_QUERY = 2; /** Abort any current transaction, by rolling it back, due to an error during the query */ protected const ERR_ABORT_TRX = 4; /** Abort and reset session due to server-side session-level state loss (locks, temp tables) */ protected const ERR_ABORT_SESSION = 8; /** Assume that queries taking this long to yield connection loss errors are at fault */ protected const DROPPED_CONN_BLAME_THRESHOLD_SEC = 3.0; /** @var string Idiom used when a cancelable atomic section started the transaction */ private const NOT_APPLICABLE = 'n/a'; /** How long before it is worth doing a dummy query to test the connection */ private const PING_TTL = 1.0; /** Dummy SQL query */ private const PING_QUERY = 'SELECT 1 AS ping'; /** Hostname or IP address to use on all connections */ protected const CONN_HOST = 'host'; /** Database server username to use on all connections */ protected const CONN_USER = 'user'; /** Database server password to use on all connections */ protected const CONN_PASSWORD = 'password'; /** Database name to use on initial connection */ protected const CONN_INITIAL_DB = 'dbname'; /** Schema name to use on initial connection */ protected const CONN_INITIAL_SCHEMA = 'schema'; /** Table prefix to use on initial connection */ protected const CONN_INITIAL_TABLE_PREFIX = 'tablePrefix'; /** @var SQLPlatform */ protected $platform; /** @var ReplicationReporter */ protected $replicationReporter; /** * @note exceptions for missing libraries/drivers should be thrown in initConnection() * @param array $params Parameters passed from Database::factory() */ public function __construct( array $params ) { $this->logger = $params['logger'] ?? new NullLogger(); $this->transactionManager = new TransactionManager( $this->logger, $params['trxProfiler'] ); $this->connectionParams = [ self::CONN_HOST => ( isset( $params['host'] ) && $params['host'] !== '' ) ? $params['host'] : null, self::CONN_USER => ( isset( $params['user'] ) && $params['user'] !== '' ) ? $params['user'] : null, self::CONN_INITIAL_DB => ( isset( $params['dbname'] ) && $params['dbname'] !== '' ) ? $params['dbname'] : null, self::CONN_INITIAL_SCHEMA => ( isset( $params['schema'] ) && $params['schema'] !== '' ) ? $params['schema'] : null, self::CONN_PASSWORD => is_string( $params['password'] ) ? $params['password'] : null, self::CONN_INITIAL_TABLE_PREFIX => (string)$params['tablePrefix'] ]; $this->lbInfo = $params['lbInfo'] ?? []; $this->connectionVariables = $params['variables'] ?? []; // Set SQL mode, default is turning them all off, can be overridden or skipped with null if ( is_string( $params['sqlMode'] ?? null ) ) { $this->connectionVariables['sql_mode'] = $params['sqlMode']; } $flags = (int)$params['flags']; $this->flagsHolder = new DatabaseFlags( $flags ); $this->ssl = $params['ssl'] ?? (bool)( $flags & self::DBO_SSL ); $this->connectTimeout = $params['connectTimeout'] ?? null; $this->receiveTimeout = $params['receiveTimeout'] ?? null; $this->cliMode = (bool)$params['cliMode']; $this->agent = (string)$params['agent']; $this->serverName = $params['serverName']; $this->nonNativeInsertSelectBatchSize = $params['nonNativeInsertSelectBatchSize'] ?? 10000; $this->strictWarnings = !empty( $params['strictWarnings'] ); $this->profiler = is_callable( $params['profiler'] ) ? $params['profiler'] : null; $this->errorLogger = $params['errorLogger']; $this->deprecationLogger = $params['deprecationLogger']; $this->csProvider = $params['criticalSectionProvider'] ?? null; // Set initial dummy domain until open() sets the final DB/prefix $this->currentDomain = new DatabaseDomain( $params['dbname'] != '' ? $params['dbname'] : null, $params['schema'] != '' ? $params['schema'] : null, $params['tablePrefix'] ); $this->platform = new SQLPlatform( $this, $this->logger, $this->currentDomain, $this->errorLogger ); // Children classes must set $this->replicationReporter. } /** * Initialize the connection to the database over the wire (or to local files) * * @throws LogicException * @throws InvalidArgumentException * @throws DBConnectionError * @since 1.31 */ final public function initConnection() { if ( $this->isOpen() ) { throw new LogicException( __METHOD__ . ': already connected' ); } // Establish the connection $this->open( $this->connectionParams[self::CONN_HOST], $this->connectionParams[self::CONN_USER], $this->connectionParams[self::CONN_PASSWORD], $this->connectionParams[self::CONN_INITIAL_DB], $this->connectionParams[self::CONN_INITIAL_SCHEMA], $this->connectionParams[self::CONN_INITIAL_TABLE_PREFIX] ); $this->lastPing = microtime( true ); } /** * Open a new connection to the database (closing any existing one) * * @param string|null $server Server host/address and optional port {@see connectionParams} * @param string|null $user User name {@see connectionParams} * @param string|null $password User password {@see connectionParams} * @param string|null $db Database name * @param string|null $schema Database schema name * @param string $tablePrefix * @throws DBConnectionError */ abstract protected function open( $server, $user, $password, $db, $schema, $tablePrefix ); /** * @return array Map of (Database::ATTR_* constant => value) * @since 1.31 */ public static function getAttributes() { return []; } /** * Set the PSR-3 logger interface to use. * * @param LoggerInterface $logger */ public function setLogger( LoggerInterface $logger ) { $this->logger = $logger; } public function getServerInfo() { return $this->getServerVersion(); } public function tablePrefix( $prefix = null ) { $old = $this->currentDomain->getTablePrefix(); if ( $prefix !== null ) { $this->currentDomain = new DatabaseDomain( $this->currentDomain->getDatabase(), $this->currentDomain->getSchema(), $prefix ); $this->platform->setCurrentDomain( $this->currentDomain ); } return $old; } public function dbSchema( $schema = null ) { $old = $this->currentDomain->getSchema(); if ( $schema !== null ) { if ( $schema !== '' && $this->getDBname() === null ) { throw new DBUnexpectedError( $this, "Cannot set schema to '$schema'; no database set" ); } $this->currentDomain = new DatabaseDomain( $this->currentDomain->getDatabase(), // DatabaseDomain uses null for unspecified schemas ( $schema !== '' ) ? $schema : null, $this->currentDomain->getTablePrefix() ); $this->platform->setCurrentDomain( $this->currentDomain ); } return (string)$old; } public function getLBInfo( $name = null ) { if ( $name === null ) { return $this->lbInfo; } if ( array_key_exists( $name, $this->lbInfo ) ) { return $this->lbInfo[$name]; } return null; } public function setLBInfo( $nameOrArray, $value = null ) { if ( is_array( $nameOrArray ) ) { $this->lbInfo = $nameOrArray; } elseif ( is_string( $nameOrArray ) ) { if ( $value !== null ) { $this->lbInfo[$nameOrArray] = $value; } else { unset( $this->lbInfo[$nameOrArray] ); } } else { throw new InvalidArgumentException( "Got non-string key" ); } } public function lastDoneWrites() { return $this->lastWriteTime; } /** * @return bool * @since 1.39 * @internal For use by Database/LoadBalancer only */ public function sessionLocksPending() { return (bool)$this->sessionNamedLocks; } /** * @return ?string Owner name of explicit transaction round being participating in; null if none */ final protected function getTransactionRoundFname() { if ( $this->flagsHolder->hasImplicitTrxFlag() ) { // LoadBalancer transaction round participation is enabled for this DB handle; // get the owner of the active explicit transaction round (if any) return $this->getLBInfo( self::LB_TRX_ROUND_FNAME ); } return null; } public function isOpen() { return (bool)$this->conn; } public function getDomainID() { return $this->currentDomain->getId(); } /** * Wrapper for addslashes() * * @param string $s String to be slashed. * @return string Slashed string. */ abstract public function strencode( $s ); /** * Set a custom error handler for logging errors during database connection */ protected function installErrorHandler() { $this->lastPhpError = false; $this->htmlErrors = ini_set( 'html_errors', '0' ); set_error_handler( [ $this, 'connectionErrorLogger' ] ); } /** * Restore the previous error handler and return the last PHP error for this DB * * @return string|false */ protected function restoreErrorHandler() { restore_error_handler(); if ( $this->htmlErrors !== false ) { ini_set( 'html_errors', $this->htmlErrors ); } return $this->getLastPHPError(); } /** * @return string|false Last PHP error for this DB (typically connection errors) */ protected function getLastPHPError() { if ( $this->lastPhpError ) { $error = preg_replace( '!\[\]!', '', $this->lastPhpError ); $error = preg_replace( '!^.*?:\s?(.*)$!', '$1', $error ); return $error; } return false; } /** * Error handler for logging errors during database connection * * @internal This method should not be used outside of Database classes * * @param int|string $errno * @param string $errstr */ public function connectionErrorLogger( $errno, $errstr ) { $this->lastPhpError = $errstr; } /** * Create a log context to pass to PSR-3 logger functions. * * @param array $extras Additional data to add to context * @return array */ protected function getLogContext( array $extras = [] ) { return array_merge( [ 'db_server' => $this->getServerName(), 'db_name' => $this->getDBname(), 'db_user' => $this->connectionParams[self::CONN_USER] ?? null, ], $extras ); } final public function close( $fname = __METHOD__ ) { $error = null; // error to throw after disconnecting $wasOpen = (bool)$this->conn; // This should mostly do nothing if the connection is already closed if ( $this->conn ) { // Roll back any dangling transaction first if ( $this->trxLevel() ) { $error = $this->transactionManager->trxCheckBeforeClose( $this, $fname ); // Rollback the changes and run any callbacks as needed $this->rollback( __METHOD__, self::FLUSHING_INTERNAL ); $this->runTransactionPostRollbackCallbacks(); } // Close the actual connection in the binding handle $closed = $this->closeConnection(); } else { $closed = true; // already closed; nothing to do } $this->conn = null; // Log any unexpected errors after having disconnected if ( $error !== null ) { // T217819, T231443: this is probably just LoadBalancer trying to recover from // errors and shutdown. Log any problems and move on since the request has to // end one way or another. Throwing errors is not very useful at some point. $this->logger->error( $error, [ 'db_log_category' => 'query' ] ); } // Note that various subclasses call close() at the start of open(), which itself is // called by replaceLostConnection(). In that case, just because onTransactionResolution() // callbacks are pending does not mean that an exception should be thrown. Rather, they // will be executed after the reconnection step. if ( $wasOpen ) { // Double check that no callbacks are dangling $fnames = $this->pendingWriteAndCallbackCallers(); if ( $fnames ) { throw new RuntimeException( "Transaction callbacks are still pending: " . implode( ', ', $fnames ) ); } } return $closed; } /** * Make sure there is an open connection handle (alive or not) * * This guards against fatal errors to the binding handle not being defined in cases * where open() was never called or close() was already called. * * @throws DBUnexpectedError */ final protected function assertHasConnectionHandle() { if ( !$this->isOpen() ) { throw new DBUnexpectedError( $this, "DB connection was already closed" ); } } /** * Closes underlying database connection * @return bool Whether connection was closed successfully * @since 1.20 */ abstract protected function closeConnection(); /** * Run a query and return a QueryStatus instance with the query result information * * This is meant to handle the basic command of actually sending a query to the * server via the driver. No implicit transaction, reconnection, nor retry logic * should happen here. The higher level query() method is designed to handle those * sorts of concerns. This method should not trigger such higher level methods. * * The lastError() and lastErrno() methods should meaningfully reflect what error, * if any, occurred during the last call to this method. Methods like executeQuery(), * query(), select(), insert(), update(), delete(), and upsert() implement their calls * to doQuery() such that an immediately subsequent call to lastError()/lastErrno() * meaningfully reflects any error that occurred during that public query method call. * * For SELECT queries, the result field contains either: * - a) A driver-specific IResultWrapper describing the query results * - b) False, on any query failure * * For non-SELECT queries, the result field contains either: * - a) A driver-specific IResultWrapper, only on success * - b) True, only on success (e.g. no meaningful result other than "OK") * - c) False, on any query failure * * @param string $sql Single-statement SQL query * @return QueryStatus * @since 1.39 */ abstract protected function doSingleStatementQuery( string $sql ): QueryStatus; /** * Determine whether a write query affects a permanent table. * This includes pseudo-permanent tables. * * @param Query $query * @return bool */ private function hasPermanentTable( Query $query ) { if ( $query->getVerb() === 'CREATE TEMPORARY' ) { // Temporary table creation is allowed return false; } $table = $query->getWriteTable(); if ( $table === null ) { // Parse error? Assume permanent. return true; } [ $db, $pt ] = $this->platform->getDatabaseAndTableIdentifier( $table ); $tempInfo = $this->sessionTempTables[$db][$pt] ?? null; return !$tempInfo || $tempInfo->pseudoPermanent; } /** * Register creation and dropping of temporary tables * * @param Query $query */ protected function registerTempTables( Query $query ) { $table = $query->getWriteTable(); if ( $table === null ) { return; } switch ( $query->getVerb() ) { case 'CREATE TEMPORARY': [ $db, $pt ] = $this->platform->getDatabaseAndTableIdentifier( $table ); $this->sessionTempTables[$db][$pt] = new TempTableInfo( $this->transactionManager->getTrxId(), (bool)( $query->getFlags() & self::QUERY_PSEUDO_PERMANENT ) ); break; case 'DROP': [ $db, $pt ] = $this->platform->getDatabaseAndTableIdentifier( $table ); unset( $this->sessionTempTables[$db][$pt] ); } } public function query( $sql, $fname = __METHOD__, $flags = 0 ) { if ( !( $sql instanceof Query ) ) { $flags = (int)$flags; // b/c; this field used to be a bool $sql = QueryBuilderFromRawSql::buildQuery( $sql, $flags, $this->currentDomain->getTablePrefix() ); } else { $flags = $sql->getFlags(); } // Make sure that this caller is allowed to issue this query statement $this->assertQueryIsCurrentlyAllowed( $sql->getVerb(), $fname ); // Send the query to the server and fetch any corresponding errors $status = $this->executeQuery( $sql, $fname, $flags ); if ( $status->res === false ) { // An error occurred; log, and, if needed, report an exception. // Errors that corrupt the transaction/session state cannot be silenced. $ignore = ( $this->flagsHolder::contains( $flags, self::QUERY_SILENCE_ERRORS ) && !$this->flagsHolder::contains( $status->flags, self::ERR_ABORT_SESSION ) && !$this->flagsHolder::contains( $status->flags, self::ERR_ABORT_TRX ) ); $this->reportQueryError( $status->message, $status->code, $sql->getSQL(), $fname, $ignore ); } return $status->res; } /** * Execute a query without enforcing public (non-Database) caller restrictions. * * Retry it if there is a recoverable connection loss (e.g. no important state lost). * * This does not precheck for transaction/session state errors or critical section errors. * * @see Database::query() * * @param Query $sql SQL statement * @param string $fname Name of the calling function * @param int $flags Bit field of ISQLPlatform::QUERY_* constants * @return QueryStatus * @throws DBUnexpectedError * @since 1.34 */ final protected function executeQuery( $sql, $fname, $flags ) { $this->assertHasConnectionHandle(); $isPermWrite = false; $isWrite = $sql->isWriteQuery(); if ( $isWrite ) { ChangedTablesTracker::recordQuery( $this->currentDomain, $sql ); // Permit temporary table writes on replica connections, but require a writable // master connection for writes to persistent tables. if ( $this->hasPermanentTable( $sql ) ) { $isPermWrite = true; $info = $this->getReadOnlyReason(); if ( $info ) { [ $reason, $source ] = $info; if ( $source === 'role' ) { throw new DBReadOnlyRoleError( $this, "Database is read-only: $reason" ); } else { throw new DBReadOnlyError( $this, "Database is read-only: $reason" ); } } // DBConnRef uses QUERY_REPLICA_ROLE to enforce replica roles during query() if ( $this->flagsHolder::contains( $sql->getFlags(), self::QUERY_REPLICA_ROLE ) ) { throw new DBReadOnlyRoleError( $this, "Cannot write; target role is DB_REPLICA" ); } } } // Whether a silent retry attempt is left for recoverable connection loss errors $retryLeft = !$this->flagsHolder::contains( $flags, self::QUERY_NO_RETRY ); $cs = $this->commenceCriticalSection( __METHOD__ ); do { // Start a DBO_TRX wrapper transaction as needed (throw an error on failure) if ( $this->beginIfImplied( $sql, $fname, $flags ) ) { // Since begin() was called, any connection loss was already handled $retryLeft = false; } // Send the query statement to the server and fetch any results. $status = $this->attemptQuery( $sql, $fname, $isPermWrite ); } while ( // An error occurred that can be recovered from via query retry $this->flagsHolder::contains( $status->flags, self::ERR_RETRY_QUERY ) && // The retry has not been exhausted (consume it now) // phpcs:ignore Generic.CodeAnalysis.AssignmentInCondition.FoundInWhileCondition $retryLeft && !( $retryLeft = false ) ); // Register creation and dropping of temporary tables if ( $status->res ) { $this->registerTempTables( $sql ); } $this->completeCriticalSection( __METHOD__, $cs ); return $status; } /** * Query method wrapper handling profiling, logging, affected row count tracking, and * automatic reconnections (without retry) on query failure due to connection loss * * Note that this does not handle DBO_TRX logic. * * This method handles profiling, debug logging, reconnection and the tracking of: * - write callers * - last write time * - affected row count of the last write * - whether writes occurred in a transaction * - last successful query time (confirming that the connection was not dropped) * * @see doSingleStatementQuery() * * @param Query $sql SQL statement * @param string $fname Name of the calling function * @param bool $isPermWrite Whether it's a query writing to permanent tables * @return QueryStatus statement result * @throws DBUnexpectedError */ private function attemptQuery( $sql, string $fname, bool $isPermWrite ) { // Transaction attributes before issuing this query $priorSessInfo = new CriticalSessionInfo( $this->transactionManager->getTrxId(), $this->transactionManager->explicitTrxActive(), $this->transactionManager->pendingWriteCallers(), $this->transactionManager->pendingPreCommitCallbackCallers(), $this->sessionNamedLocks, $this->sessionTempTables ); // Get the transaction-aware SQL string used for profiling $generalizedSql = GeneralizedSql::newFromQuery( $sql, ( $this->replicationReporter->getTopologyRole() === self::ROLE_STREAMING_MASTER ) ? 'role-primary: ' : '' ); // Add agent and calling method comments to the SQL $cStatement = $this->makeCommentedSql( $sql->getSQL(), $fname ); // Start profile section $ps = $this->profiler ? ( $this->profiler )( $generalizedSql->stringify() ) : null; $startTime = microtime( true ); // Clear any overrides from a prior "query method". Note that this does not affect // any such methods that are currently invoking query() itself since those query // methods set these fields before returning. $this->lastEmulatedAffectedRows = null; $this->lastEmulatedInsertId = null; $status = $this->doSingleStatementQuery( $cStatement ); // End profile section $endTime = microtime( true ); $queryRuntime = max( $endTime - $startTime, 0.0 ); unset( $ps ); if ( $status->res !== false ) { $this->lastPing = $endTime; } $affectedRowCount = $status->rowsAffected; $returnedRowCount = $status->rowsReturned; $this->lastQueryAffectedRows = $affectedRowCount; if ( $status->res !== false ) { if ( $isPermWrite ) { if ( $this->trxLevel() ) { $this->transactionManager->transactionWritingIn( $this->getServerName(), $this->getDomainID(), $startTime ); $this->transactionManager->updateTrxWriteQueryReport( $sql->getSQL(), $queryRuntime, $affectedRowCount, $fname ); } else { $this->lastWriteTime = $endTime; } } } $this->transactionManager->recordQueryCompletion( $generalizedSql, $startTime, $isPermWrite, $isPermWrite ? $affectedRowCount : $returnedRowCount, $this->getServerName() ); // Check if the query failed... $status->flags = $this->handleErroredQuery( $status, $sql, $fname, $queryRuntime, $priorSessInfo ); // Avoid the overhead of logging calls unless debug mode is enabled if ( $this->flagsHolder->getFlag( self::DBO_DEBUG ) ) { $this->logger->debug( "{method} [{runtime_ms}ms] {db_server}: {sql}", $this->getLogContext( [ 'method' => $fname, 'sql' => $sql->getSQL(), 'domain' => $this->getDomainID(), 'runtime_ms' => round( $queryRuntime * 1000, 3 ), 'db_log_category' => 'query' ] ) ); } return $status; } private function handleErroredQuery( QueryStatus $status, $sql, $fname, $queryRuntime, $priorSessInfo ) { $errflags = self::ERR_NONE; $error = $status->message; $errno = $status->code; if ( $status->res !== false ) { // Statement succeeded return $errflags; } if ( $this->isConnectionError( $errno ) ) { // Connection lost before or during the query... // Determine how to proceed given the lost session state $connLossFlag = $this->assessConnectionLoss( $sql->getVerb(), $queryRuntime, $priorSessInfo ); // Update session state tracking and try to reestablish a connection $reconnected = $this->replaceLostConnection( $errno, __METHOD__ ); // Check if important server-side session-level state was lost if ( $connLossFlag >= self::ERR_ABORT_SESSION ) { $ex = $this->getQueryException( $error, $errno, $sql->getSQL(), $fname ); $this->transactionManager->setSessionError( $ex ); } // Check if important server-side transaction-level state was lost if ( $connLossFlag >= self::ERR_ABORT_TRX ) { $ex = $this->getQueryException( $error, $errno, $sql->getSQL(), $fname ); $this->transactionManager->setTransactionError( $ex ); } // Check if the query should be retried (having made the reconnection attempt) if ( $connLossFlag === self::ERR_RETRY_QUERY ) { $errflags |= ( $reconnected ? self::ERR_RETRY_QUERY : self::ERR_ABORT_QUERY ); } else { $errflags |= $connLossFlag; } } elseif ( $this->isKnownStatementRollbackError( $errno ) ) { // Query error triggered a server-side statement-only rollback... $errflags |= self::ERR_ABORT_QUERY; if ( $this->trxLevel() ) { // Allow legacy callers to ignore such errors via QUERY_IGNORE_DBO_TRX and // try/catch. However, a deprecation notice will be logged on the next query. $cause = [ $error, $errno, $fname ]; $this->transactionManager->setTrxStatusIgnoredCause( $cause ); } } elseif ( $this->trxLevel() ) { // Some other error occurred during the query, within a transaction... // Server-side handling of errors during transactions varies widely depending on // the RDBMS type and configuration. There are several possible results: (a) the // whole transaction is rolled back, (b) only the queries after BEGIN are rolled // back, (c) the transaction is marked as "aborted" and a ROLLBACK is required // before other queries are permitted. For compatibility reasons, pessimistically // require a ROLLBACK query (not using SAVEPOINT) before allowing other queries. $ex = $this->getQueryException( $error, $errno, $sql->getSQL(), $fname ); $this->transactionManager->setTransactionError( $ex ); $errflags |= self::ERR_ABORT_TRX; } else { // Some other error occurred during the query, without a transaction... $errflags |= self::ERR_ABORT_QUERY; } return $errflags; } /** * @param string $sql * @param string $fname * @return string */ private function makeCommentedSql( $sql, $fname ): string { // Add trace comment to the begin of the sql string, right after the operator. // Or, for one-word queries (like "BEGIN" or COMMIT") add it to the end (T44598). // NOTE: Don't add varying ids such as request id or session id to the comment. // It would break aggregation of similar queries in analysis tools (see T193050#7512149) $encName = preg_replace( '/[\x00-\x1F\/]/', '-', "$fname {$this->agent}" ); return preg_replace( '/\s|$/', " /* $encName */ ", $sql, 1 ); } /** * Start an implicit transaction if DBO_TRX is enabled and no transaction is active * * @param Query $sql SQL statement * @param string $fname * @param int $flags Bit field of ISQLPlatform::QUERY_* constants * @return bool Whether an implicit transaction was started * @throws DBError */ private function beginIfImplied( $sql, $fname, $flags ) { if ( !$this->trxLevel() && $this->flagsHolder->hasApplicableImplicitTrxFlag( $flags ) ) { if ( $this->platform->isTransactableQuery( $sql ) ) { $this->begin( __METHOD__ . " ($fname)", self::TRANSACTION_INTERNAL ); $this->transactionManager->turnOnAutomatic(); return true; } } return false; } /** * Check if callers outside of Database can run the given query given the session state * * In order to keep the DB handle's session state tracking in sync, certain queries * like "USE", "BEGIN", "COMMIT", and "ROLLBACK" must not be issued directly from * outside callers. Such commands should only be issued through dedicated methods * like selectDomain(), begin(), commit(), and rollback(), respectively. * * This also checks if the session state tracking was corrupted by a prior exception. * * @param string $verb * @param string $fname * @throws DBUnexpectedError * @throws DBTransactionStateError */ private function assertQueryIsCurrentlyAllowed( string $verb, string $fname ) { if ( $verb === 'USE' ) { throw new DBUnexpectedError( $this, "Got USE query; use selectDomain() instead" ); } if ( $verb === 'ROLLBACK' ) { // Whole transaction rollback is used for recovery // @TODO: T269161; prevent "BEGIN"/"COMMIT"/"ROLLBACK" from outside callers return; } if ( $this->csmError ) { throw new DBTransactionStateError( $this, "Cannot execute query from $fname while session state is out of sync", [], $this->csmError ); } $this->transactionManager->assertSessionStatus( $this, $fname ); if ( $verb !== 'ROLLBACK TO SAVEPOINT' ) { $this->transactionManager->assertTransactionStatus( $this, $this->deprecationLogger, $fname ); } } /** * Determine how to handle a connection lost discovered during a query attempt * * This checks if explicit transactions, pending transaction writes, and important * session-level state (locks, temp tables) was lost. Point-in-time read snapshot loss * is considered acceptable for DBO_TRX logic. * * If state was lost, but that loss was discovered during a ROLLBACK that would have * destroyed that state anyway, treat the error as recoverable. * * @param string $verb SQL query verb * @param float $walltime How many seconds passes while attempting the query * @param CriticalSessionInfo $priorSessInfo Session state just before the query * @return int Recovery approach. One of the following ERR_* class constants: * - Database::ERR_RETRY_QUERY: reconnect silently, retry query * - Database::ERR_ABORT_QUERY: reconnect silently, do not retry query * - Database::ERR_ABORT_TRX: reconnect, throw error, enforce transaction rollback * - Database::ERR_ABORT_SESSION: reconnect, throw error, enforce session rollback */ private function assessConnectionLoss( string $verb, float $walltime, CriticalSessionInfo $priorSessInfo ) { if ( $walltime < self::DROPPED_CONN_BLAME_THRESHOLD_SEC ) { // Query failed quickly; the connection was probably lost before the query was sent $res = self::ERR_RETRY_QUERY; } else { // Query took a long time; the connection was probably lost during query execution $res = self::ERR_ABORT_QUERY; } // List of problems causing session/transaction state corruption $blockers = []; // Loss of named locks breaks future callers relying on those locks for critical sections foreach ( $priorSessInfo->namedLocks as $lockName => $lockInfo ) { if ( $lockInfo['trxId'] && $lockInfo['trxId'] === $priorSessInfo->trxId ) { // Treat lost locks acquired during the lost transaction as a transaction state // problem. Connection loss on ROLLBACK (non-SAVEPOINT) is tolerable since // rollback automatically triggered server-side. if ( $verb !== 'ROLLBACK' ) { $res = max( $res, self::ERR_ABORT_TRX ); $blockers[] = "named lock '$lockName'"; } } else { // Treat lost locks acquired either during prior transactions or during no // transaction as a session state problem. $res = max( $res, self::ERR_ABORT_SESSION ); $blockers[] = "named lock '$lockName'"; } } // Loss of temp tables breaks future callers relying on those tables for queries foreach ( $priorSessInfo->tempTables as $domainTempTables ) { foreach ( $domainTempTables as $tableName => $tableInfo ) { if ( $tableInfo->trxId && $tableInfo->trxId === $priorSessInfo->trxId ) { // Treat lost temp tables created during the lost transaction as a // transaction state problem. Connection loss on ROLLBACK (non-SAVEPOINT) // is tolerable since rollback automatically triggered server-side. if ( $verb !== 'ROLLBACK' ) { $res = max( $res, self::ERR_ABORT_TRX ); $blockers[] = "temp table '$tableName'"; } } else { // Treat lost temp tables created either during prior transactions or during // no transaction as a session state problem. $res = max( $res, self::ERR_ABORT_SESSION ); $blockers[] = "temp table '$tableName'"; } } } // Loss of transaction writes breaks future callers and DBO_TRX logic relying on those // writes to be atomic and still pending. Connection loss on ROLLBACK (non-SAVEPOINT) is // tolerable since rollback automatically triggered server-side. if ( $priorSessInfo->trxWriteCallers && $verb !== 'ROLLBACK' ) { $res = max( $res, self::ERR_ABORT_TRX ); $blockers[] = 'uncommitted writes'; } if ( $priorSessInfo->trxPreCommitCbCallers && $verb !== 'ROLLBACK' ) { $res = max( $res, self::ERR_ABORT_TRX ); $blockers[] = 'pre-commit callbacks'; } if ( $priorSessInfo->trxExplicit && $verb !== 'ROLLBACK' && $verb !== 'COMMIT' ) { // Transaction automatically rolled back, breaking the expectations of callers // relying on the continued existence of that transaction for things like atomic // writes, serializability, or reads from the same point-in-time snapshot. If the // connection loss occured on ROLLBACK (non-SAVEPOINT) or COMMIT, then we do not // need to mark the transaction state as corrupt, since no transaction would still // be open even if the query did succeed (T127428). $res = max( $res, self::ERR_ABORT_TRX ); $blockers[] = 'explicit transaction'; } if ( $blockers ) { $this->logger->warning( "cannot reconnect to {db_server} silently: {error}", $this->getLogContext( [ 'error' => 'session state loss (' . implode( ', ', $blockers ) . ')', 'exception' => new RuntimeException(), 'db_log_category' => 'connection' ] ) ); } return $res; } /** * Clean things up after session (and thus transaction) loss before reconnect */ private function handleSessionLossPreconnect() { // Clean up tracking of session-level things... // https://mariadb.com/kb/en/create-table/#create-temporary-table // https://www.postgresql.org/docs/9.2/static/sql-createtable.html (ignoring ON COMMIT) $this->sessionTempTables = []; // https://mariadb.com/kb/en/get_lock/ // https://www.postgresql.org/docs/9.4/static/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS $this->sessionNamedLocks = []; // Session loss implies transaction loss (T67263) $this->transactionManager->onSessionLoss( $this ); // Clear additional subclass fields $this->doHandleSessionLossPreconnect(); } /** * Reset any additional subclass trx* and session* fields */ protected function doHandleSessionLossPreconnect() { // no-op } /** * Checks whether the cause of the error is detected to be a timeout. * * It returns false by default, and not all engines support detecting this yet. * If this returns false, it will be treated as a generic query error. * * @param int|string $errno Error number * @return bool * @since 1.39 */ protected function isQueryTimeoutError( $errno ) { return false; } /** * Report a query error * * If $ignore is set, emit a DEBUG level log entry and continue, * otherwise, emit an ERROR level log entry and throw an exception. * * @param string $error * @param int|string $errno * @param string $sql * @param string $fname * @param bool $ignore Whether to just log an error rather than throw an exception * @throws DBQueryError */ public function reportQueryError( $error, $errno, $sql, $fname, $ignore = false ) { if ( $ignore ) { $this->logger->debug( "SQL ERROR (ignored): $error", [ 'db_log_category' => 'query' ] ); } else { throw $this->getQueryExceptionAndLog( $error, $errno, $sql, $fname ); } } /** * @param string $error * @param string|int $errno * @param string $sql * @param string $fname * @return DBError */ private function getQueryExceptionAndLog( $error, $errno, $sql, $fname ) { // Information that instances of the same problem have in common should // not be normalized (T255202). $this->logger->error( "Error $errno from $fname, {error} {sql1line} {db_server}", $this->getLogContext( [ 'method' => __METHOD__, 'errno' => $errno, 'error' => $error, 'sql1line' => mb_substr( str_replace( "\n", "\\n", $sql ), 0, 5 * 1024 ), 'fname' => $fname, 'db_log_category' => 'query', 'exception' => new RuntimeException() ] ) ); return $this->getQueryException( $error, $errno, $sql, $fname ); } /** * @param string $error * @param string|int $errno * @param string $sql * @param string $fname * @return DBError */ private function getQueryException( $error, $errno, $sql, $fname ) { if ( $this->isQueryTimeoutError( $errno ) ) { return new DBQueryTimeoutError( $this, $error, $errno, $sql, $fname ); } elseif ( $this->isConnectionError( $errno ) ) { return new DBQueryDisconnectedError( $this, $error, $errno, $sql, $fname ); } else { return new DBQueryError( $this, $error, $errno, $sql, $fname ); } } /** * @param string $error * @return DBConnectionError */ final protected function newExceptionAfterConnectError( $error ) { // Connection was not fully initialized and is not safe for use. // Stash any error associated with the handle before destroying it. $this->lastConnectError = $error; $this->conn = null; $this->logger->error( "Error connecting to {db_server} as user {db_user}: {error}", $this->getLogContext( [ 'error' => $error, 'exception' => new RuntimeException(), 'db_log_category' => 'connection', ] ) ); return new DBConnectionError( $this, $error ); } /** * Get a SelectQueryBuilder bound to this connection. This is overridden by * DBConnRef. * * @return SelectQueryBuilder */ public function newSelectQueryBuilder(): SelectQueryBuilder { return new SelectQueryBuilder( $this ); } /** * Get a UnionQueryBuilder bound to this connection. This is overridden by * DBConnRef. * * @return UnionQueryBuilder */ public function newUnionQueryBuilder(): UnionQueryBuilder { return new UnionQueryBuilder( $this ); } /** * Get an UpdateQueryBuilder bound to this connection. This is overridden by * DBConnRef. * * @return UpdateQueryBuilder */ public function newUpdateQueryBuilder(): UpdateQueryBuilder { return new UpdateQueryBuilder( $this ); } /** * Get a DeleteQueryBuilder bound to this connection. This is overridden by * DBConnRef. * * @return DeleteQueryBuilder */ public function newDeleteQueryBuilder(): DeleteQueryBuilder { return new DeleteQueryBuilder( $this ); } /** * Get a InsertQueryBuilder bound to this connection. This is overridden by * DBConnRef. * * @return InsertQueryBuilder */ public function newInsertQueryBuilder(): InsertQueryBuilder { return new InsertQueryBuilder( $this ); } /** * Get a ReplaceQueryBuilder bound to this connection. This is overridden by * DBConnRef. * * @return ReplaceQueryBuilder */ public function newReplaceQueryBuilder(): ReplaceQueryBuilder { return new ReplaceQueryBuilder( $this ); } public function selectField( $tables, $var, $cond = '', $fname = __METHOD__, $options = [], $join_conds = [] ) { if ( $var === '*' ) { throw new DBUnexpectedError( $this, "Cannot use a * field" ); } elseif ( is_array( $var ) && count( $var ) !== 1 ) { throw new DBUnexpectedError( $this, 'Cannot use more than one field' ); } $options = $this->platform->normalizeOptions( $options ); $options['LIMIT'] = 1; $res = $this->select( $tables, $var, $cond, $fname, $options, $join_conds ); if ( $res === false ) { throw new DBUnexpectedError( $this, "Got false from select()" ); } $row = $res->fetchRow(); if ( $row === false ) { return false; } return reset( $row ); } public function selectFieldValues( $tables, $var, $cond = '', $fname = __METHOD__, $options = [], $join_conds = [] ): array { if ( $var === '*' ) { throw new DBUnexpectedError( $this, "Cannot use a * field" ); } elseif ( !is_string( $var ) ) { throw new DBUnexpectedError( $this, "Cannot use an array of fields" ); } $options = $this->platform->normalizeOptions( $options ); $res = $this->select( $tables, [ 'value' => $var ], $cond, $fname, $options, $join_conds ); if ( $res === false ) { throw new DBUnexpectedError( $this, "Got false from select()" ); } $values = []; foreach ( $res as $row ) { $values[] = $row->value; } return $values; } public function select( $tables, $vars, $conds = '', $fname = __METHOD__, $options = [], $join_conds = [] ) { $options = (array)$options; // Don't turn this into using platform directly, DatabaseMySQL overrides this. $sql = $this->selectSQLText( $tables, $vars, $conds, $fname, $options, $join_conds ); // Treat SELECT queries with FOR UPDATE as writes. This matches // how MySQL enforces read_only (FOR SHARE and LOCK IN SHADE MODE are allowed). $flags = in_array( 'FOR UPDATE', $options, true ) ? self::QUERY_CHANGE_ROWS : self::QUERY_CHANGE_NONE; $query = new Query( $sql, $flags, 'SELECT' ); return $this->query( $query, $fname ); } public function selectRow( $tables, $vars, $conds, $fname = __METHOD__, $options = [], $join_conds = [] ) { $options = (array)$options; $options['LIMIT'] = 1; $res = $this->select( $tables, $vars, $conds, $fname, $options, $join_conds ); if ( $res === false ) { throw new DBUnexpectedError( $this, "Got false from select()" ); } if ( !$res->numRows() ) { return false; } return $res->fetchObject(); } /** * @inheritDoc */ public function estimateRowCount( $tables, $var = '*', $conds = '', $fname = __METHOD__, $options = [], $join_conds = [] ): int { $conds = $this->platform->normalizeConditions( $conds, $fname ); $column = $this->platform->extractSingleFieldFromList( $var ); if ( is_string( $column ) && !in_array( $column, [ '*', '1' ] ) ) { $conds[] = "$column IS NOT NULL"; } $res = $this->select( $tables, [ 'rowcount' => 'COUNT(*)' ], $conds, $fname, $options, $join_conds ); $row = $res ? $res->fetchRow() : []; return isset( $row['rowcount'] ) ? (int)$row['rowcount'] : 0; } public function selectRowCount( $tables, $var = '*', $conds = '', $fname = __METHOD__, $options = [], $join_conds = [] ): int { $conds = $this->platform->normalizeConditions( $conds, $fname ); $column = $this->platform->extractSingleFieldFromList( $var ); if ( is_string( $column ) && !in_array( $column, [ '*', '1' ] ) ) { $conds[] = "$column IS NOT NULL"; } if ( in_array( 'DISTINCT', (array)$options ) ) { if ( $column === null ) { throw new DBUnexpectedError( $this, '$var cannot be empty when the DISTINCT option is given' ); } $innerVar = $column; } else { $innerVar = '1'; } $res = $this->select( [ 'tmp_count' => $this->platform->buildSelectSubquery( $tables, $innerVar, $conds, $fname, $options, $join_conds ) ], [ 'rowcount' => 'COUNT(*)' ], [], $fname ); $row = $res ? $res->fetchRow() : []; return isset( $row['rowcount'] ) ? (int)$row['rowcount'] : 0; } public function lockForUpdate( $table, $conds = '', $fname = __METHOD__, $options = [], $join_conds = [] ) { if ( !$this->trxLevel() && !$this->flagsHolder->hasImplicitTrxFlag() ) { throw new DBUnexpectedError( $this, __METHOD__ . ': no transaction is active nor is DBO_TRX set' ); } $options = (array)$options; $options[] = 'FOR UPDATE'; return $this->selectRowCount( $table, '*', $conds, $fname, $options, $join_conds ); } public function fieldExists( $table, $field, $fname = __METHOD__ ) { $info = $this->fieldInfo( $table, $field ); return (bool)$info; } abstract public function tableExists( $table, $fname = __METHOD__ ); public function indexExists( $table, $index, $fname = __METHOD__ ) { $info = $this->indexInfo( $table, $index, $fname ); return (bool)$info; } public function indexUnique( $table, $index, $fname = __METHOD__ ) { $info = $this->indexInfo( $table, $index, $fname ); return $info ? $info['unique'] : null; } /** * Get information about an index into an object * * @param string $table The unqualified name of a table * @param string $index Index name * @param string $fname Calling function name * @return array|false Index info map; false if it does not exist * @phan-return array{unique:bool}|false */ abstract public function indexInfo( $table, $index, $fname = __METHOD__ ); public function insert( $table, $rows, $fname = __METHOD__, $options = [] ) { $query = $this->platform->dispatchingInsertSqlText( $table, $rows, $options ); if ( !$query ) { return true; } $this->query( $query, $fname ); if ( $this->strictWarnings ) { $this->checkInsertWarnings( $query, $fname ); } return true; } /** * Check for warnings after performing an INSERT query, and throw exceptions * if necessary. * * @param Query $query * @param string $fname * @return void */ protected function checkInsertWarnings( Query $query, $fname ) { } public function update( $table, $set, $conds, $fname = __METHOD__, $options = [] ) { $query = $this->platform->updateSqlText( $table, $set, $conds, $options ); $this->query( $query, $fname ); return true; } public function databasesAreIndependent() { return false; } final public function selectDomain( $domain ) { $cs = $this->commenceCriticalSection( __METHOD__ ); try { $this->doSelectDomain( DatabaseDomain::newFromId( $domain ) ); } catch ( DBError $e ) { $this->completeCriticalSection( __METHOD__, $cs ); throw $e; } $this->completeCriticalSection( __METHOD__, $cs ); } /** * @param DatabaseDomain $domain * @throws DBConnectionError * @throws DBError * @since 1.32 */ protected function doSelectDomain( DatabaseDomain $domain ) { $this->currentDomain = $domain; $this->platform->setCurrentDomain( $this->currentDomain ); } public function getDBname() { return $this->currentDomain->getDatabase(); } public function getServer() { return $this->connectionParams[self::CONN_HOST] ?? null; } public function getServerName() { return $this->serverName ?? $this->getServer() ?? 'unknown'; } public function addQuotes( $s ) { if ( $s instanceof RawSQLValue ) { return $s->toSql(); } if ( $s instanceof Blob ) { $s = $s->fetch(); } if ( $s === null ) { return 'NULL'; } elseif ( is_bool( $s ) ) { return (string)(int)$s; } elseif ( is_int( $s ) ) { return (string)$s; } else { return "'" . $this->strencode( $s ) . "'"; } } public function expr( string $field, string $op, $value ): Expression { return new Expression( $field, $op, $value ); } public function andExpr( array $conds ): AndExpressionGroup { return AndExpressionGroup::newFromArray( $conds ); } public function orExpr( array $conds ): OrExpressionGroup { return OrExpressionGroup::newFromArray( $conds ); } public function replace( $table, $uniqueKeys, $rows, $fname = __METHOD__ ) { $uniqueKey = $this->platform->normalizeUpsertParams( $uniqueKeys, $rows ); if ( !$rows ) { return; } $affectedRowCount = 0; $insertId = null; $this->startAtomic( $fname, self::ATOMIC_CANCELABLE ); try { foreach ( $rows as $row ) { // Delete any conflicting rows (including ones inserted from $rows) $query = $this->platform->deleteSqlText( $table, [ $this->platform->makeKeyCollisionCondition( [ $row ], $uniqueKey ) ] ); $this->query( $query, $fname ); // Insert the new row $query = $this->platform->dispatchingInsertSqlText( $table, $row, [] ); $this->query( $query, $fname ); $affectedRowCount += $this->lastQueryAffectedRows; $insertId = $insertId ?: $this->lastQueryInsertId; } $this->endAtomic( $fname ); } catch ( DBError $e ) { $this->cancelAtomic( $fname ); throw $e; } $this->lastEmulatedAffectedRows = $affectedRowCount; $this->lastEmulatedInsertId = $insertId; } public function upsert( $table, array $rows, $uniqueKeys, array $set, $fname = __METHOD__ ) { $uniqueKey = $this->platform->normalizeUpsertParams( $uniqueKeys, $rows ); if ( !$rows ) { return true; } $this->platform->assertValidUpsertSetArray( $set, $uniqueKey, $rows ); $encTable = $this->tableName( $table ); $sqlColumnAssignments = $this->makeList( $set, self::LIST_SET ); // Get any AUTO_INCREMENT/SERIAL column for this table so we can set insertId() $autoIncrementColumn = $this->getInsertIdColumnForUpsert( $table ); // Check if there is a SQL assignment expression in $set (as generated by SQLPlatform::buildExcludedValue) $useWith = (bool)array_filter( $set, static function ( $v, $k ) { return $v instanceof RawSQLValue || is_int( $k ); }, ARRAY_FILTER_USE_BOTH ); // Subclasses might need explicit type casting within "WITH...AS (VALUES ...)" // so that these CTE rows can be referenced within the SET clause assigments. $typeByColumn = $useWith ? $this->getValueTypesForWithClause( $table ) : []; $first = true; $affectedRowCount = 0; $insertId = null; $this->startAtomic( $fname, self::ATOMIC_CANCELABLE ); try { foreach ( $rows as $row ) { // Update any existing conflicting row (including ones inserted from $rows) [ $sqlColumns, $sqlTuples, $sqlVals ] = $this->platform->makeInsertLists( [ $row ], '__', $typeByColumn ); $sqlConditions = $this->platform->makeKeyCollisionCondition( [ $row ], $uniqueKey ); $query = new Query( ( $useWith ? "WITH __VALS ($sqlVals) AS (VALUES $sqlTuples) " : "" ) . "UPDATE $encTable SET $sqlColumnAssignments " . "WHERE ($sqlConditions)", self::QUERY_CHANGE_ROWS, 'UPDATE', $table ); $this->query( $query, $fname ); $rowsUpdated = $this->lastQueryAffectedRows; $affectedRowCount += $rowsUpdated; if ( $rowsUpdated > 0 ) { // Conflicting row found and updated if ( $first && $autoIncrementColumn !== null ) { // @TODO: use "RETURNING" instead (when supported by SQLite) $query = new Query( "SELECT $autoIncrementColumn AS id FROM $encTable " . "WHERE ($sqlConditions)", self::QUERY_CHANGE_NONE, 'SELECT' ); $sRes = $this->query( $query, $fname, self::QUERY_CHANGE_ROWS ); $insertId = (int)$sRes->fetchRow()['id']; } } else { // No conflicting row found $query = new Query( "INSERT INTO $encTable ($sqlColumns) VALUES $sqlTuples", self::QUERY_CHANGE_ROWS, 'INSERT', $table ); $this->query( $query, $fname ); $affectedRowCount += $this->lastQueryAffectedRows; } $first = false; } $this->endAtomic( $fname ); } catch ( DBError $e ) { $this->cancelAtomic( $fname ); throw $e; } $this->lastEmulatedAffectedRows = $affectedRowCount; $this->lastEmulatedInsertId = $insertId; return true; } /** * @param string $table The unqualified name of a table * @return string|null The AUTO_INCREMENT/SERIAL column; null if not needed */ protected function getInsertIdColumnForUpsert( $table ) { return null; } /** * @param string $table The unqualified name of a table * @return array Map of (column => type); [] if not needed */ protected function getValueTypesForWithClause( $table ) { return []; } public function deleteJoin( $delTable, $joinTable, $delVar, $joinVar, $conds, $fname = __METHOD__ ) { $sql = $this->platform->deleteJoinSqlText( $delTable, $joinTable, $delVar, $joinVar, $conds ); $query = new Query( $sql, self::QUERY_CHANGE_ROWS, 'DELETE', $delTable ); $this->query( $query, $fname ); } public function delete( $table, $conds, $fname = __METHOD__ ) { $this->query( $this->platform->deleteSqlText( $table, $conds ), $fname ); return true; } final public function insertSelect( $destTable, $srcTable, $varMap, $conds, $fname = __METHOD__, $insertOptions = [], $selectOptions = [], $selectJoinConds = [] ) { static $hints = [ 'NO_AUTO_COLUMNS' ]; $insertOptions = $this->platform->normalizeOptions( $insertOptions ); $selectOptions = $this->platform->normalizeOptions( $selectOptions ); if ( $this->cliMode && $this->isInsertSelectSafe( $insertOptions, $selectOptions, $fname ) ) { // For massive migrations with downtime, we don't want to select everything // into memory and OOM, so do all this native on the server side if possible. $this->doInsertSelectNative( $destTable, $srcTable, $varMap, $conds, $fname, array_diff( $insertOptions, $hints ), $selectOptions, $selectJoinConds ); } else { $this->doInsertSelectGeneric( $destTable, $srcTable, $varMap, $conds, $fname, array_diff( $insertOptions, $hints ), $selectOptions, $selectJoinConds ); } return true; } /** * @param array $insertOptions * @param array $selectOptions * @param string $fname * @return bool Whether an INSERT SELECT with these options will be replication safe * @since 1.31 */ protected function isInsertSelectSafe( array $insertOptions, array $selectOptions, $fname ) { return true; } /** * Implementation of insertSelect() based on select() and insert() * * @see IDatabase::insertSelect() * @param string $destTable Unqualified name of destination table * @param string|array $srcTable Unqualified name of source table * @param array $varMap * @param array $conds * @param string $fname * @param array $insertOptions * @param array $selectOptions * @param array $selectJoinConds * @since 1.35 */ private function doInsertSelectGeneric( $destTable, $srcTable, array $varMap, $conds, $fname, array $insertOptions, array $selectOptions, $selectJoinConds ) { // For web requests, do a locking SELECT and then INSERT. This puts the SELECT burden // on only the primary DB (without needing row-based-replication). It also makes it easy to // know how big the INSERT is going to be. $fields = []; foreach ( $varMap as $dstColumn => $sourceColumnOrSql ) { $fields[] = $this->platform->fieldNameWithAlias( $sourceColumnOrSql, $dstColumn ); } $res = $this->select( $srcTable, implode( ',', $fields ), $conds, $fname, array_merge( $selectOptions, [ 'FOR UPDATE' ] ), $selectJoinConds ); $affectedRowCount = 0; $insertId = null; if ( $res ) { $this->startAtomic( $fname, self::ATOMIC_CANCELABLE ); try { $rows = []; foreach ( $res as $row ) { $rows[] = (array)$row; } // Avoid inserts that are too huge $rowBatches = array_chunk( $rows, $this->nonNativeInsertSelectBatchSize ); foreach ( $rowBatches as $rows ) { $query = $this->platform->dispatchingInsertSqlText( $destTable, $rows, $insertOptions ); $this->query( $query, $fname ); $affectedRowCount += $this->lastQueryAffectedRows; $insertId = $insertId ?: $this->lastQueryInsertId; } $this->endAtomic( $fname ); } catch ( DBError $e ) { $this->cancelAtomic( $fname ); throw $e; } } $this->lastEmulatedAffectedRows = $affectedRowCount; $this->lastEmulatedInsertId = $insertId; } /** * Native server-side implementation of insertSelect() for situations where * we don't want to select everything into memory * * @see IDatabase::insertSelect() * @param string $destTable The unqualified name of destination table * @param string|array $srcTable The unqualified name of source table * @param array $varMap * @param array $conds * @param string $fname * @param array $insertOptions * @param array $selectOptions * @param array $selectJoinConds * @since 1.35 */ protected function doInsertSelectNative( $destTable, $srcTable, array $varMap, $conds, $fname, array $insertOptions, array $selectOptions, $selectJoinConds ) { $sql = $this->platform->insertSelectNativeSqlText( $destTable, $srcTable, $varMap, $conds, $fname, $insertOptions, $selectOptions, $selectJoinConds ); $query = new Query( $sql, self::QUERY_CHANGE_ROWS, 'INSERT', $destTable ); $this->query( $query, $fname ); } /** * Do not use this method outside of Database/DBError classes * * @param int|string $errno * @return bool Whether the given query error was a connection drop * @since 1.38 */ protected function isConnectionError( $errno ) { return false; } /** * @param int|string $errno * @return bool Whether it is known that the last query error only caused statement rollback * @note This is for backwards compatibility for callers catching DBError exceptions in * order to ignore problems like duplicate key errors or foreign key violations * @since 1.39 */ protected function isKnownStatementRollbackError( $errno ) { return false; // don't know; it could have caused a transaction rollback } /** * @inheritDoc */ public function serverIsReadOnly() { return false; } final public function onTransactionResolution( callable $callback, $fname = __METHOD__ ) { $this->transactionManager->onTransactionResolution( $this, $callback, $fname ); } final public function onTransactionCommitOrIdle( callable $callback, $fname = __METHOD__ ) { if ( !$this->trxLevel() && $this->getTransactionRoundFname() !== null ) { // This DB handle is set to participate in LoadBalancer transaction rounds and // an explicit transaction round is active. Start an implicit transaction on this // DB handle (setting trxAutomatic) similar to how query() does in such situations. $this->begin( __METHOD__, self::TRANSACTION_INTERNAL ); } $this->transactionManager->addPostCommitOrIdleCallback( $callback, $fname ); if ( !$this->trxLevel() ) { $dbErrors = []; $this->runOnTransactionIdleCallbacks( self::TRIGGER_IDLE, $dbErrors ); if ( $dbErrors ) { throw $dbErrors[0]; } } } final public function onTransactionPreCommitOrIdle( callable $callback, $fname = __METHOD__ ) { if ( !$this->trxLevel() && $this->getTransactionRoundFname() !== null ) { // This DB handle is set to participate in LoadBalancer transaction rounds and // an explicit transaction round is active. Start an implicit transaction on this // DB handle (setting trxAutomatic) similar to how query() does in such situations. $this->begin( __METHOD__, self::TRANSACTION_INTERNAL ); } if ( $this->trxLevel() ) { $this->transactionManager->addPreCommitOrIdleCallback( $callback, $fname ); } else { // No transaction is active nor will start implicitly, so make one for this callback $this->startAtomic( __METHOD__, self::ATOMIC_CANCELABLE ); try { $callback( $this ); } catch ( Throwable $e ) { // Avoid confusing error reporting during critical section errors if ( !$this->csmError ) { $this->cancelAtomic( __METHOD__ ); } throw $e; } $this->endAtomic( __METHOD__ ); } } final public function setTransactionListener( $name, ?callable $callback = null ) { $this->transactionManager->setTransactionListener( $name, $callback ); } /** * Whether to disable running of post-COMMIT/ROLLBACK callbacks * * @internal This method should not be used outside of Database/LoadBalancer * * @since 1.28 * @param bool $suppress */ final public function setTrxEndCallbackSuppression( $suppress ) { $this->transactionManager->setTrxEndCallbackSuppression( $suppress ); } /** * Consume and run any "on transaction idle/resolution" callbacks * * @internal This method should not be used outside of Database/LoadBalancer * * @since 1.20 * @param int $trigger IDatabase::TRIGGER_* constant * @param DBError[] &$errors DB exceptions caught [returned] * @return int Number of callbacks attempted * @throws DBUnexpectedError * @throws Throwable Any non-DBError exception thrown by a callback */ public function runOnTransactionIdleCallbacks( $trigger, array &$errors = [] ) { if ( $this->trxLevel() ) { throw new DBUnexpectedError( $this, __METHOD__ . ': a transaction is still open' ); } if ( $this->transactionManager->isEndCallbacksSuppressed() ) { // Execution deferred by LoadBalancer for explicit execution later return 0; } $cs = $this->commenceCriticalSection( __METHOD__ ); $count = 0; $autoTrx = $this->flagsHolder->hasImplicitTrxFlag(); // automatic begin() enabled? // Drain the queues of transaction "idle" and "end" callbacks until they are empty do { $callbackEntries = $this->transactionManager->consumeEndCallbacks(); $count += count( $callbackEntries ); foreach ( $callbackEntries as $entry ) { $this->flagsHolder->clearFlag( self::DBO_TRX ); // make each query its own transaction try { $entry[0]( $trigger, $this ); } catch ( DBError $ex ) { call_user_func( $this->errorLogger, $ex ); $errors[] = $ex; // Some callbacks may use startAtomic/endAtomic, so make sure // their transactions are ended so other callbacks don't fail if ( $this->trxLevel() ) { $this->rollback( __METHOD__, self::FLUSHING_INTERNAL ); } } finally { if ( $autoTrx ) { $this->flagsHolder->setFlag( self::DBO_TRX ); // restore automatic begin() } else { $this->flagsHolder->clearFlag( self::DBO_TRX ); // restore auto-commit } } } } while ( $this->transactionManager->countPostCommitOrIdleCallbacks() ); $this->completeCriticalSection( __METHOD__, $cs ); return $count; } /** * Actually run any "transaction listener" callbacks * * @internal This method should not be used outside of Database/LoadBalancer * * @since 1.20 * @param int $trigger IDatabase::TRIGGER_* constant * @param DBError[] &$errors DB exceptions caught [returned] * @throws Throwable Any non-DBError exception thrown by a callback */ public function runTransactionListenerCallbacks( $trigger, array &$errors = [] ) { if ( $this->transactionManager->isEndCallbacksSuppressed() ) { // Execution deferred by LoadBalancer for explicit execution later return; } // These callbacks should only be registered in setup, thus no iteration is needed foreach ( $this->transactionManager->getRecurringCallbacks() as $callback ) { try { $callback( $trigger, $this ); } catch ( DBError $ex ) { ( $this->errorLogger )( $ex ); $errors[] = $ex; } } } /** * Handle "on transaction idle/resolution" and "transaction listener" callbacks post-COMMIT * * @throws DBError The first DBError exception thrown by a callback * @throws Throwable Any non-DBError exception thrown by a callback */ private function runTransactionPostCommitCallbacks() { $dbErrors = []; $this->runOnTransactionIdleCallbacks( self::TRIGGER_COMMIT, $dbErrors ); $this->runTransactionListenerCallbacks( self::TRIGGER_COMMIT, $dbErrors ); $this->lastEmulatedAffectedRows = 0; // for the sake of consistency if ( $dbErrors ) { throw $dbErrors[0]; } } /** * Handle "on transaction idle/resolution" and "transaction listener" callbacks post-ROLLBACK * * This will suppress and log any DBError exceptions * * @throws Throwable Any non-DBError exception thrown by a callback */ private function runTransactionPostRollbackCallbacks() { $this->runOnTransactionIdleCallbacks( self::TRIGGER_ROLLBACK ); $this->runTransactionListenerCallbacks( self::TRIGGER_ROLLBACK ); $this->lastEmulatedAffectedRows = 0; // for the sake of consistency } final public function startAtomic( $fname = __METHOD__, $cancelable = self::ATOMIC_NOT_CANCELABLE ) { $cs = $this->commenceCriticalSection( __METHOD__ ); if ( $this->trxLevel() ) { // This atomic section is only one part of a larger transaction $sectionOwnsTrx = false; } else { // Start an implicit transaction (sets trxAutomatic) try { $this->begin( $fname, self::TRANSACTION_INTERNAL ); } catch ( DBError $e ) { $this->completeCriticalSection( __METHOD__, $cs ); throw $e; } if ( $this->flagsHolder->hasImplicitTrxFlag() ) { // This DB handle participates in LoadBalancer transaction rounds; all atomic // sections should be buffered into one transaction (e.g. to keep web requests // transactional). Note that an implicit transaction round is considered to be // active when no there is no explicit transaction round. $sectionOwnsTrx = false; } else { // This DB handle does not participate in LoadBalancer transaction rounds; // each topmost atomic section will use its own transaction. $sectionOwnsTrx = true; } $this->transactionManager->setAutomaticAtomic( $sectionOwnsTrx ); } if ( $cancelable === self::ATOMIC_CANCELABLE ) { if ( $sectionOwnsTrx ) { // This atomic section is synonymous with the whole transaction; just // use full COMMIT/ROLLBACK in endAtomic()/cancelAtomic(), respectively $savepointId = self::NOT_APPLICABLE; } else { // This atomic section is only part of the whole transaction; use a SAVEPOINT // query so that its changes can be cancelled without losing the rest of the // transaction (e.g. changes from other sections or from outside of sections) try { $savepointId = $this->transactionManager->nextSavePointId( $this, $fname ); $sql = $this->platform->savepointSqlText( $savepointId ); $query = new Query( $sql, self::QUERY_CHANGE_TRX, 'SAVEPOINT' ); $this->query( $query, $fname ); } catch ( DBError $e ) { $this->completeCriticalSection( __METHOD__, $cs, $e ); throw $e; } } } else { $savepointId = null; } $sectionId = new AtomicSectionIdentifier; $this->transactionManager->addToAtomicLevels( $fname, $sectionId, $savepointId ); $this->completeCriticalSection( __METHOD__, $cs ); return $sectionId; } final public function endAtomic( $fname = __METHOD__ ) { [ $savepointId, $sectionId ] = $this->transactionManager->onEndAtomic( $this, $fname ); $runPostCommitCallbacks = false; $cs = $this->commenceCriticalSection( __METHOD__ ); // Remove the last section (no need to re-index the array) $finalLevelOfImplicitTrxPopped = $this->transactionManager->popAtomicLevel(); try { if ( $finalLevelOfImplicitTrxPopped ) { $this->commit( $fname, self::FLUSHING_INTERNAL ); $runPostCommitCallbacks = true; } elseif ( $savepointId !== null && $savepointId !== self::NOT_APPLICABLE ) { $sql = $this->platform->releaseSavepointSqlText( $savepointId ); $query = new Query( $sql, self::QUERY_CHANGE_TRX, 'RELEASE SAVEPOINT' ); $this->query( $query, $fname ); } } catch ( DBError $e ) { $this->completeCriticalSection( __METHOD__, $cs, $e ); throw $e; } $this->transactionManager->onEndAtomicInCriticalSection( $sectionId ); $this->completeCriticalSection( __METHOD__, $cs ); if ( $runPostCommitCallbacks ) { $this->runTransactionPostCommitCallbacks(); } } final public function cancelAtomic( $fname = __METHOD__, ?AtomicSectionIdentifier $sectionId = null ) { $this->transactionManager->onCancelAtomicBeforeCriticalSection( $this, $fname ); $pos = $this->transactionManager->getPositionFromSectionId( $sectionId ); if ( $pos < 0 ) { throw new DBUnexpectedError( $this, "Atomic section not found (for $fname)" ); } $cs = $this->commenceCriticalSection( __METHOD__ ); $runPostRollbackCallbacks = false; [ $savedFname, $excisedSectionIds, $newTopSectionId, $savedSectionId, $savepointId ] = $this->transactionManager->cancelAtomic( $pos ); try { if ( $savedFname !== $fname ) { $e = new DBUnexpectedError( $this, "Invalid atomic section ended (got $fname but expected $savedFname)" ); $this->completeCriticalSection( __METHOD__, $cs, $e ); throw $e; } // Remove the last section (no need to re-index the array) $this->transactionManager->popAtomicLevel(); $excisedSectionIds[] = $savedSectionId; $newTopSectionId = $this->transactionManager->currentAtomicSectionId(); if ( $savepointId !== null ) { // Rollback the transaction changes proposed within this atomic section if ( $savepointId === self::NOT_APPLICABLE ) { // Atomic section started the transaction; rollback the whole transaction // and trigger cancellation callbacks for all active atomic sections $this->rollback( $fname, self::FLUSHING_INTERNAL ); $runPostRollbackCallbacks = true; } else { // Atomic section nested within the transaction; rollback the transaction // to the state prior to this section and trigger its cancellation callbacks $sql = $this->platform->rollbackToSavepointSqlText( $savepointId ); $query = new Query( $sql, self::QUERY_CHANGE_TRX, 'ROLLBACK TO SAVEPOINT' ); $this->query( $query, $fname ); $this->transactionManager->setTrxStatusToOk(); // no exception; recovered } } else { // Put the transaction into an error state if it's not already in one $trxError = new DBUnexpectedError( $this, "Uncancelable atomic section canceled (got $fname)" ); $this->transactionManager->setTransactionError( $trxError ); } } finally { // Fix up callbacks owned by the sections that were just cancelled. // All callbacks should have an owner that is present in trxAtomicLevels. $this->transactionManager->modifyCallbacksForCancel( $excisedSectionIds, $newTopSectionId ); } $this->lastEmulatedAffectedRows = 0; // for the sake of consistency $this->completeCriticalSection( __METHOD__, $cs ); if ( $runPostRollbackCallbacks ) { $this->runTransactionPostRollbackCallbacks(); } } final public function doAtomicSection( $fname, callable $callback, $cancelable = self::ATOMIC_NOT_CANCELABLE ) { $sectionId = $this->startAtomic( $fname, $cancelable ); try { $res = $callback( $this, $fname ); } catch ( Throwable $e ) { // Avoid confusing error reporting during critical section errors if ( !$this->csmError ) { $this->cancelAtomic( $fname, $sectionId ); } throw $e; } $this->endAtomic( $fname ); return $res; } final public function begin( $fname = __METHOD__, $mode = self::TRANSACTION_EXPLICIT ) { static $modes = [ self::TRANSACTION_EXPLICIT, self::TRANSACTION_INTERNAL ]; if ( !in_array( $mode, $modes, true ) ) { throw new DBUnexpectedError( $this, "$fname: invalid mode parameter '$mode'" ); } $this->transactionManager->onBegin( $this, $fname ); if ( $this->flagsHolder->hasImplicitTrxFlag() && $mode !== self::TRANSACTION_INTERNAL ) { $msg = "$fname: implicit transaction expected (DBO_TRX set)"; throw new DBUnexpectedError( $this, $msg ); } $this->assertHasConnectionHandle(); $cs = $this->commenceCriticalSection( __METHOD__ ); $timeStart = microtime( true ); try { $this->doBegin( $fname ); } catch ( DBError $e ) { $this->completeCriticalSection( __METHOD__, $cs ); throw $e; } $timeEnd = microtime( true ); // Treat "BEGIN" as a trivial query to gauge the RTT delay $rtt = max( $timeEnd - $timeStart, 0.0 ); $this->transactionManager->onBeginInCriticalSection( $mode, $fname, $rtt ); $this->replicationReporter->resetReplicationLagStatus( $this ); $this->completeCriticalSection( __METHOD__, $cs ); } /** * Issues the BEGIN command to the database server. * * @see Database::begin() * @param string $fname * @throws DBError */ protected function doBegin( $fname ) { $query = new Query( 'BEGIN', self::QUERY_CHANGE_TRX, 'BEGIN' ); $this->query( $query, $fname ); } final public function commit( $fname = __METHOD__, $flush = self::FLUSHING_ONE ) { static $modes = [ self::FLUSHING_ONE, self::FLUSHING_ALL_PEERS, self::FLUSHING_INTERNAL ]; if ( !in_array( $flush, $modes, true ) ) { throw new DBUnexpectedError( $this, "$fname: invalid flush parameter '$flush'" ); } if ( !$this->transactionManager->onCommit( $this, $fname, $flush ) ) { return; } $this->assertHasConnectionHandle(); $this->runOnTransactionPreCommitCallbacks(); $cs = $this->commenceCriticalSection( __METHOD__ ); try { if ( $this->trxLevel() ) { $query = new Query( 'COMMIT', self::QUERY_CHANGE_TRX, 'COMMIT' ); $this->query( $query, $fname ); } } catch ( DBError $e ) { $this->completeCriticalSection( __METHOD__, $cs ); throw $e; } $lastWriteTime = $this->transactionManager->onCommitInCriticalSection( $this ); if ( $lastWriteTime ) { $this->lastWriteTime = $lastWriteTime; } // With FLUSHING_ALL_PEERS, callbacks will run when requested by a dedicated phase // within LoadBalancer. With FLUSHING_INTERNAL, callbacks will run when requested by // the Database caller during a safe point. This avoids isolation and recursion issues. if ( $flush === self::FLUSHING_ONE ) { $this->runTransactionPostCommitCallbacks(); } $this->completeCriticalSection( __METHOD__, $cs ); } final public function rollback( $fname = __METHOD__, $flush = self::FLUSHING_ONE ) { if ( $flush !== self::FLUSHING_INTERNAL && $flush !== self::FLUSHING_ALL_PEERS && $this->flagsHolder->hasImplicitTrxFlag() ) { throw new DBUnexpectedError( $this, "$fname: Expected mass rollback of all peer transactions (DBO_TRX set)" ); } if ( !$this->trxLevel() ) { $this->transactionManager->setTrxStatusToNone(); $this->transactionManager->clearPreEndCallbacks(); if ( $this->transactionManager->trxLevel() === TransactionManager::STATUS_TRX_ERROR ) { $this->logger->info( "$fname: acknowledged server-side transaction loss on {db_server}", $this->getLogContext() ); } return; } $this->assertHasConnectionHandle(); if ( $this->csmError ) { // Since the session state is corrupt, we cannot just rollback the transaction // while preserving the non-transaction session state. The handle will remain // marked as corrupt until flushSession() is called to reset the connection // and deal with any remaining callbacks. $this->logger->info( "$fname: acknowledged client-side transaction loss on {db_server}", $this->getLogContext() ); return; } $cs = $this->commenceCriticalSection( __METHOD__ ); if ( $this->trxLevel() ) { // Disconnects cause rollback anyway, so ignore those errors $query = new Query( $this->platform->rollbackSqlText(), self::QUERY_SILENCE_ERRORS | self::QUERY_CHANGE_TRX, 'ROLLBACK' ); $this->query( $query, $fname ); } $this->transactionManager->onRollbackInCriticalSection( $this ); // With FLUSHING_ALL_PEERS, callbacks will run when requested by a dedicated phase // within LoadBalancer. With FLUSHING_INTERNAL, callbacks will run when requested by // the Database caller during a safe point. This avoids isolation and recursion issues. if ( $flush === self::FLUSHING_ONE ) { $this->runTransactionPostRollbackCallbacks(); } $this->completeCriticalSection( __METHOD__, $cs ); } /** * @internal Only for tests and highly discouraged * @param TransactionManager $transactionManager */ public function setTransactionManager( TransactionManager $transactionManager ) { $this->transactionManager = $transactionManager; } public function flushSession( $fname = __METHOD__, $flush = self::FLUSHING_ONE ) { if ( $flush !== self::FLUSHING_INTERNAL && $flush !== self::FLUSHING_ALL_PEERS && $this->flagsHolder->hasImplicitTrxFlag() ) { throw new DBUnexpectedError( $this, "$fname: Expected mass flush of all peer connections (DBO_TRX set)" ); } if ( $this->csmError ) { // If a critical section error occurred, such as Excimer timeout exceptions raised // before a query response was marshalled, destroy the connection handle and reset // the session state tracking variables. The value of trxLevel() is irrelevant here, // and, in fact, might be 1 due to rollback() deferring critical section recovery. $this->logger->info( "$fname: acknowledged client-side session loss on {db_server}", $this->getLogContext() ); $this->csmError = null; $this->csmFname = null; $this->replaceLostConnection( 2048, __METHOD__ ); return; } if ( $this->trxLevel() ) { // Any existing transaction should have been rolled back already throw new DBUnexpectedError( $this, "$fname: transaction still in progress (not yet rolled back)" ); } if ( $this->transactionManager->sessionStatus() === TransactionManager::STATUS_SESS_ERROR ) { // If the session state was already lost due to either an unacknowledged session // state loss error (e.g. dropped connection) or an explicit connection close call, // then there is nothing to do here. Note that in such cases, even temporary tables // and server-side config variables are lost (invocation of this method is assumed // to imply that such losses are tolerable). $this->logger->info( "$fname: acknowledged server-side session loss on {db_server}", $this->getLogContext() ); } elseif ( $this->isOpen() ) { // Connection handle exists; server-side session state must be flushed $this->doFlushSession( $fname ); $this->sessionNamedLocks = []; } $this->transactionManager->clearSessionError(); } /** * Reset the server-side session state for named locks and table locks * * Connection and query errors will be suppressed and logged * * @param string $fname * @since 1.38 */ protected function doFlushSession( $fname ) { // no-op } public function flushSnapshot( $fname = __METHOD__, $flush = self::FLUSHING_ONE ) { $this->transactionManager->onFlushSnapshot( $this, $fname, $flush, $this->getTransactionRoundFname() ); if ( $this->transactionManager->sessionStatus() === TransactionManager::STATUS_SESS_ERROR || $this->transactionManager->trxStatus() === TransactionManager::STATUS_TRX_ERROR ) { $this->rollback( $fname, self::FLUSHING_INTERNAL ); } else { $this->commit( $fname, self::FLUSHING_INTERNAL ); } } public function duplicateTableStructure( $oldName, $newName, $temporary = false, $fname = __METHOD__ ) { throw new RuntimeException( __METHOD__ . ' is not implemented in descendant class' ); } public function listTables( $prefix = null, $fname = __METHOD__ ) { throw new RuntimeException( __METHOD__ . ' is not implemented in descendant class' ); } public function affectedRows() { $this->lastEmulatedAffectedRows ??= $this->lastQueryAffectedRows; return $this->lastEmulatedAffectedRows; } public function insertId() { if ( $this->lastEmulatedInsertId === null ) { // Guard against misuse of this method by checking affectedRows(). Note that calls // to insert() with "IGNORE" and calls to insertSelect() might not add any rows. if ( $this->affectedRows() ) { $this->lastEmulatedInsertId = $this->lastInsertId(); } else { $this->lastEmulatedInsertId = 0; } } return $this->lastEmulatedInsertId; } /** * Get a row ID from the last insert statement to implicitly assign one within the session * * If the statement involved assigning sequence IDs to multiple rows, then the return value * will be any one of those values (database-specific). If the statement was an "UPSERT" and * some existing rows were updated, then the result will either reflect only IDs of created * rows or it will reflect IDs of both created and updated rows (this is database-specific). * * The result is unspecified if the statement gave an error. * * @return int Sequence ID, 0 (if none) * @throws DBError */ abstract protected function lastInsertId(); public function ping() { if ( $this->isOpen() ) { // If the connection was recently used, assume that it is still good if ( ( microtime( true ) - $this->lastPing ) < self::PING_TTL ) { return true; } // Send a trivial query to test the connection, triggering an automatic // reconnection attempt if the connection was lost $query = new Query( self::PING_QUERY, self::QUERY_IGNORE_DBO_TRX | self::QUERY_SILENCE_ERRORS | self::QUERY_CHANGE_NONE, 'SELECT' ); $res = $this->query( $query, __METHOD__ ); $ok = ( $res !== false ); } else { // Try to re-establish a connection $ok = $this->replaceLostConnection( null, __METHOD__ ); } return $ok; } /** * Close any existing (dead) database connection and open a new connection * * @param int|null $lastErrno * @param string $fname * @return bool True if new connection is opened successfully, false if error */ protected function replaceLostConnection( $lastErrno, $fname ) { if ( $this->conn ) { $this->closeConnection(); $this->conn = null; $this->handleSessionLossPreconnect(); } try { $this->open( $this->connectionParams[self::CONN_HOST], $this->connectionParams[self::CONN_USER], $this->connectionParams[self::CONN_PASSWORD], $this->currentDomain->getDatabase(), $this->currentDomain->getSchema(), $this->tablePrefix() ); $this->lastPing = microtime( true ); $ok = true; $this->logger->warning( $fname . ': lost connection to {db_server} with error {errno}; reconnected', $this->getLogContext( [ 'exception' => new RuntimeException(), 'db_log_category' => 'connection', 'errno' => $lastErrno ] ) ); } catch ( DBConnectionError $e ) { $ok = false; $this->logger->error( $fname . ': lost connection to {db_server} with error {errno}; reconnection failed: {connect_msg}', $this->getLogContext( [ 'exception' => new RuntimeException(), 'db_log_category' => 'connection', 'errno' => $lastErrno, 'connect_msg' => $e->getMessage() ] ) ); } // Handle callbacks in trxEndCallbacks, e.g. onTransactionResolution(). // If callback suppression is set then the array will remain unhandled. $this->runOnTransactionIdleCallbacks( self::TRIGGER_ROLLBACK ); // Handle callbacks in trxRecurringCallbacks, e.g. setTransactionListener(). // If callback suppression is set then the array will remain unhandled. $this->runTransactionListenerCallbacks( self::TRIGGER_ROLLBACK ); return $ok; } /** * Merge the result of getSessionLagStatus() for several DBs * using the most pessimistic values to estimate the lag of * any data derived from them in combination * * This is information is useful for caching modules * * @see WANObjectCache::set() * @see WANObjectCache::getWithSetCallback() * * @param IReadableDatabase|null ...$dbs * Note: For backward compatibility, it is allowed for null values * to be passed among the parameters. This is deprecated since 1.36, * only IReadableDatabase objects should be passed. * * @return array Map of values: * - lag: highest lag of any of the DBs or false on error (e.g. replication stopped) * - since: oldest UNIX timestamp of any of the DB lag estimates * - pending: whether any of the DBs have uncommitted changes * @throws DBError * @since 1.27 */ public static function getCacheSetOptions( ?IReadableDatabase ...$dbs ) { $res = [ 'lag' => 0, 'since' => INF, 'pending' => false ]; foreach ( func_get_args() as $db ) { if ( $db instanceof IReadableDatabase ) { $status = $db->getSessionLagStatus(); if ( $status['lag'] === false ) { $res['lag'] = false; } elseif ( $res['lag'] !== false ) { $res['lag'] = max( $res['lag'], $status['lag'] ); } $res['since'] = min( $res['since'], $status['since'] ); } if ( $db instanceof IDatabaseForOwner ) { $res['pending'] = $res['pending'] ?: $db->writesPending(); } } return $res; } public function encodeBlob( $b ) { return $b; } public function decodeBlob( $b ) { if ( $b instanceof Blob ) { $b = $b->fetch(); } return $b; } public function setSessionOptions( array $options ) { } public function sourceFile( $filename, ?callable $lineCallback = null, ?callable $resultCallback = null, $fname = false, ?callable $inputCallback = null ) { AtEase::suppressWarnings(); $fp = fopen( $filename, 'r' ); AtEase::restoreWarnings(); if ( $fp === false ) { throw new RuntimeException( "Could not open \"{$filename}\"" ); } if ( !$fname ) { $fname = __METHOD__ . "( $filename )"; } try { return $this->sourceStream( $fp, $lineCallback, $resultCallback, $fname, $inputCallback ); } finally { fclose( $fp ); } } public function sourceStream( $fp, ?callable $lineCallback = null, ?callable $resultCallback = null, $fname = __METHOD__, ?callable $inputCallback = null ) { $delimiterReset = new ScopedCallback( function ( $delimiter ) { $this->delimiter = $delimiter; }, [ $this->delimiter ] ); $cmd = ''; while ( !feof( $fp ) ) { if ( $lineCallback ) { call_user_func( $lineCallback ); } $line = trim( fgets( $fp ) ); if ( $line == '' ) { continue; } if ( $line[0] == '-' && $line[1] == '-' ) { continue; } if ( $cmd != '' ) { $cmd .= ' '; } $done = $this->streamStatementEnd( $cmd, $line ); $cmd .= "$line\n"; if ( $done || feof( $fp ) ) { $cmd = $this->platform->replaceVars( $cmd ); if ( $inputCallback ) { $callbackResult = $inputCallback( $cmd ); if ( is_string( $callbackResult ) || !$callbackResult ) { $cmd = $callbackResult; } } if ( $cmd ) { $res = $this->query( $cmd, $fname ); if ( $resultCallback ) { $resultCallback( $res, $this ); } if ( $res === false ) { $err = $this->lastError(); return "Query \"{$cmd}\" failed with error code \"$err\".\n"; } } $cmd = ''; } } ScopedCallback::consume( $delimiterReset ); return true; } /** * Called by sourceStream() to check if we've reached a statement end * * @param string &$sql SQL assembled so far * @param string &$newLine New line about to be added to $sql * @return bool Whether $newLine contains end of the statement */ public function streamStatementEnd( &$sql, &$newLine ) { if ( $this->delimiter ) { $prev = $newLine; $newLine = preg_replace( '/' . preg_quote( $this->delimiter, '/' ) . '$/', '', $newLine ); if ( $newLine != $prev ) { return true; } } return false; } /** * @inheritDoc */ public function lockIsFree( $lockName, $method ) { // RDBMs methods for checking named locks may or may not count this thread itself. // In MySQL, IS_FREE_LOCK() returns 0 if the thread already has the lock. This is // the behavior chosen by the interface for this method. if ( isset( $this->sessionNamedLocks[$lockName] ) ) { $lockIsFree = false; } else { $lockIsFree = $this->doLockIsFree( $lockName, $method ); } return $lockIsFree; } /** * @see lockIsFree() * * @param string $lockName * @param string $method * @return bool Success * @throws DBError */ protected function doLockIsFree( string $lockName, string $method ) { return true; // not implemented } /** * @inheritDoc */ public function lock( $lockName, $method, $timeout = 5, $flags = 0 ) { $lockTsUnix = $this->doLock( $lockName, $method, $timeout ); if ( $lockTsUnix !== null ) { $locked = true; $this->sessionNamedLocks[$lockName] = [ 'ts' => $lockTsUnix, 'trxId' => $this->transactionManager->getTrxId() ]; } else { $locked = false; $this->logger->info( __METHOD__ . ": failed to acquire lock '{lockname}'", [ 'lockname' => $lockName, 'db_log_category' => 'locking' ] ); } return $this->flagsHolder::contains( $flags, self::LOCK_TIMESTAMP ) ? $lockTsUnix : $locked; } /** * @see lock() * * @param string $lockName * @param string $method * @param int $timeout * @return float|null UNIX timestamp of lock acquisition; null on failure * @throws DBError */ protected function doLock( string $lockName, string $method, int $timeout ) { return microtime( true ); // not implemented } /** * @inheritDoc */ public function unlock( $lockName, $method ) { if ( !isset( $this->sessionNamedLocks[$lockName] ) ) { $released = false; $this->logger->warning( __METHOD__ . ": trying to release unheld lock '$lockName'\n", [ 'db_log_category' => 'locking' ] ); } else { $released = $this->doUnlock( $lockName, $method ); if ( $released ) { unset( $this->sessionNamedLocks[$lockName] ); } else { $this->logger->warning( __METHOD__ . ": failed to release lock '$lockName'\n", [ 'db_log_category' => 'locking' ] ); } } return $released; } /** * @see unlock() * * @param string $lockName * @param string $method * @return bool Success * @throws DBError */ protected function doUnlock( string $lockName, string $method ) { return true; // not implemented } public function getScopedLockAndFlush( $lockKey, $fname, $timeout ) { $this->transactionManager->onGetScopedLockAndFlush( $this, $fname ); if ( !$this->lock( $lockKey, $fname, $timeout ) ) { return null; } $unlocker = new ScopedCallback( function () use ( $lockKey, $fname ) { // Note that the callback can be reached due to an exception making the calling // function end early. If the transaction/session is in an error state, avoid log // spam and confusing replacement of an original DBError with one about unlock(). // Unlock query will fail anyway; avoid possibly triggering errors in rollback() if ( $this->transactionManager->sessionStatus() === TransactionManager::STATUS_SESS_ERROR || $this->transactionManager->trxStatus() === TransactionManager::STATUS_TRX_ERROR ) { return; } if ( $this->trxLevel() ) { $this->onTransactionResolution( function () use ( $lockKey, $fname ) { $this->unlock( $lockKey, $fname ); }, $fname ); } else { $this->unlock( $lockKey, $fname ); } } ); $this->commit( $fname, self::FLUSHING_INTERNAL ); return $unlocker; } public function dropTable( $table, $fname = __METHOD__ ) { if ( !$this->tableExists( $table, $fname ) ) { return false; } $query = new Query( $this->platform->dropTableSqlText( $table ), self::QUERY_CHANGE_SCHEMA, 'DROP', $table ); $this->query( $query, $fname ); return true; } public function truncateTable( $table, $fname = __METHOD__ ) { $sql = "TRUNCATE TABLE " . $this->tableName( $table ); $query = new Query( $sql, self::QUERY_CHANGE_SCHEMA, 'TRUNCATE', $table ); $this->query( $query, $fname ); } public function isReadOnly() { return ( $this->getReadOnlyReason() !== null ); } /** * @return array|null Tuple of (reason string, "role" or "lb") if read-only; null otherwise */ protected function getReadOnlyReason() { $reason = $this->replicationReporter->getTopologyBasedReadOnlyReason(); if ( $reason ) { return $reason; } $reason = $this->getLBInfo( self::LB_READ_ONLY_REASON ); if ( is_string( $reason ) ) { return [ $reason, 'lb' ]; } return null; } /** * Get the underlying binding connection handle * * Makes sure the connection resource is set (disconnects and ping() failure can unset it). * This catches broken callers than catch and ignore disconnection exceptions. * Unlike checking isOpen(), this is safe to call inside of open(). * * @return mixed * @throws DBUnexpectedError * @since 1.26 */ protected function getBindingHandle() { if ( !$this->conn ) { throw new DBUnexpectedError( $this, 'DB connection was already closed or the connection dropped' ); } return $this->conn; } /** * Demark the start of a critical section of session/transaction state changes * * Use this to disable potentially DB handles due to corruption from highly unexpected * exceptions (e.g. from zend timers or coding errors) preempting execution of methods. * * Callers must demark completion of the critical section with completeCriticalSection(). * Callers should handle DBError exceptions that do not cause object state corruption by * catching them, calling completeCriticalSection(), and then rethrowing them. * * @code * $cs = $this->commenceCriticalSection( __METHOD__ ); * try { * //...send a query that changes the session/transaction state... * } catch ( DBError $e ) { * $this->completeCriticalSection( __METHOD__, $cs ); * throw $expectedException; * } * try { * //...send another query that changes the session/transaction state... * } catch ( DBError $trxError ) { * // Require ROLLBACK before allowing any other queries from outside callers * $this->completeCriticalSection( __METHOD__, $cs, $trxError ); * throw $expectedException; * } * // ...update session state fields of $this... * $this->completeCriticalSection( __METHOD__, $cs ); * @endcode * * @see Database::completeCriticalSection() * * @since 1.36 * @param string $fname Caller name * @return CriticalSectionScope|null RAII-style monitor (topmost sections only) * @throws DBUnexpectedError If an unresolved critical section error already exists */ protected function commenceCriticalSection( string $fname ) { if ( $this->csmError ) { throw new DBUnexpectedError( $this, "Cannot execute $fname critical section while session state is out of sync.\n\n" . $this->csmError->getMessage() . "\n" . $this->csmError->getTraceAsString() ); } if ( $this->csmId ) { $csm = null; // fold into the outer critical section } elseif ( $this->csProvider ) { $csm = $this->csProvider->scopedEnter( $fname, null, // emergency limit (default) null, // emergency callback (default) function () use ( $fname ) { // Mark a critical section as having been aborted by an error $e = new RuntimeException( "A critical section from {$fname} has failed" ); $this->csmError = $e; $this->csmId = null; } ); $this->csmId = $csm->getId(); $this->csmFname = $fname; } else { $csm = null; // not supported } return $csm; } /** * Demark the completion of a critical section of session/transaction state changes * * @see Database::commenceCriticalSection() * * @since 1.36 * @param string $fname Caller name * @param CriticalSectionScope|null $csm RAII-style monitor (topmost sections only) * @param Throwable|null $trxError Error that requires setting STATUS_TRX_ERROR (if any) */ protected function completeCriticalSection( string $fname, ?CriticalSectionScope $csm, ?Throwable $trxError = null ) { if ( $csm !== null ) { if ( $this->csmId === null ) { throw new LogicException( "$fname critical section is not active" ); } elseif ( $csm->getId() !== $this->csmId ) { throw new LogicException( "$fname critical section is not the active ({$this->csmFname}) one" ); } $csm->exit(); $this->csmId = null; } if ( $trxError ) { $this->transactionManager->setTransactionError( $trxError ); } } public function __toString() { $id = spl_object_id( $this ); $description = $this->getType() . ' object #' . $id; // phpcs:ignore MediaWiki.Usage.ForbiddenFunctions.is_resource if ( is_resource( $this->conn ) ) { $description .= ' (' . (string)$this->conn . ')'; // "resource id #" } elseif ( is_object( $this->conn ) ) { $handleId = spl_object_id( $this->conn ); $description .= " (handle id #$handleId)"; } return $description; } /** * Make sure that copies do not share the same client binding handle * @throws DBConnectionError */ public function __clone() { $this->logger->warning( "Cloning " . static::class . " is not recommended; forking connection", [ 'exception' => new RuntimeException(), 'db_log_category' => 'connection' ] ); if ( $this->isOpen() ) { // Open a new connection resource without messing with the old one $this->conn = null; $this->transactionManager->clearEndCallbacks(); $this->handleSessionLossPreconnect(); // no trx or locks anymore $this->open( $this->connectionParams[self::CONN_HOST], $this->connectionParams[self::CONN_USER], $this->connectionParams[self::CONN_PASSWORD], $this->currentDomain->getDatabase(), $this->currentDomain->getSchema(), $this->tablePrefix() ); $this->lastPing = microtime( true ); } } /** * Called by serialize. Throw an exception when DB connection is serialized. * This causes problems on some database engines because the connection is * not restored on unserialize. * @return never */ public function __sleep() { throw new RuntimeException( 'Database serialization may cause problems, since ' . 'the connection is not restored on wakeup' ); } /** * Run a few simple checks and close dangling connections */ public function __destruct() { if ( $this->transactionManager ) { // Tests mock this class and disable constructor. $this->transactionManager->onDestruct(); } $danglingWriters = $this->pendingWriteAndCallbackCallers(); if ( $danglingWriters ) { $fnames = implode( ', ', $danglingWriters ); trigger_error( "DB transaction writes or callbacks still pending ($fnames)" ); } if ( $this->conn ) { // Avoid connection leaks. Normally, resources close at script completion. // The connection might already be closed in PHP by now, so suppress warnings. AtEase::suppressWarnings(); $this->closeConnection(); AtEase::restoreWarnings(); $this->conn = null; } } /* Start of methods delegated to DatabaseFlags. Avoid using them outside of rdbms library */ public function setFlag( $flag, $remember = self::REMEMBER_NOTHING ) { $this->flagsHolder->setFlag( $flag, $remember ); } public function clearFlag( $flag, $remember = self::REMEMBER_NOTHING ) { $this->flagsHolder->clearFlag( $flag, $remember ); } public function restoreFlags( $state = self::RESTORE_PRIOR ) { $this->flagsHolder->restoreFlags( $state ); } public function getFlag( $flag ) { return $this->flagsHolder->getFlag( $flag ); } /* End of methods delegated to DatabaseFlags. */ /* Start of methods delegated to TransactionManager. Avoid using them outside of rdbms library */ final public function trxLevel() { // FIXME: A lot of tests disable constructor leading to trx manager being // null and breaking, this is unacceptable but hopefully this should // happen less by moving these functions to the transaction manager class. if ( !$this->transactionManager ) { $this->transactionManager = new TransactionManager( new NullLogger() ); } return $this->transactionManager->trxLevel(); } public function trxTimestamp() { return $this->transactionManager->trxTimestamp(); } public function trxStatus() { return $this->transactionManager->trxStatus(); } public function writesPending() { return $this->transactionManager->writesPending(); } public function writesOrCallbacksPending() { return $this->transactionManager->writesOrCallbacksPending(); } public function pendingWriteQueryDuration( $type = self::ESTIMATE_TOTAL ) { return $this->transactionManager->pendingWriteQueryDuration( $type ); } public function pendingWriteCallers() { if ( !$this->transactionManager ) { return []; } return $this->transactionManager->pendingWriteCallers(); } public function pendingWriteAndCallbackCallers() { if ( !$this->transactionManager ) { return []; } return $this->transactionManager->pendingWriteAndCallbackCallers(); } public function runOnTransactionPreCommitCallbacks() { return $this->transactionManager->runOnTransactionPreCommitCallbacks( $this ); } public function explicitTrxActive() { return $this->transactionManager->explicitTrxActive(); } /* End of methods delegated to TransactionManager. */ /* Start of methods delegated to SQLPlatform. Avoid using them outside of rdbms library */ public function implicitOrderby() { return $this->platform->implicitOrderby(); } public function selectSQLText( $tables, $vars, $conds = '', $fname = __METHOD__, $options = [], $join_conds = [] ) { return $this->platform->selectSQLText( $tables, $vars, $conds, $fname, $options, $join_conds ); } public function buildComparison( string $op, array $conds ): string { return $this->platform->buildComparison( $op, $conds ); } public function makeList( array $a, $mode = self::LIST_COMMA ) { return $this->platform->makeList( $a, $mode ); } public function makeWhereFrom2d( $data, $baseKey, $subKey ) { return $this->platform->makeWhereFrom2d( $data, $baseKey, $subKey ); } public function factorConds( $condsArray ) { return $this->platform->factorConds( $condsArray ); } public function bitNot( $field ) { return $this->platform->bitNot( $field ); } public function bitAnd( $fieldLeft, $fieldRight ) { return $this->platform->bitAnd( $fieldLeft, $fieldRight ); } public function bitOr( $fieldLeft, $fieldRight ) { return $this->platform->bitOr( $fieldLeft, $fieldRight ); } public function buildConcat( $stringList ) { return $this->platform->buildConcat( $stringList ); } public function buildGreatest( $fields, $values ) { return $this->platform->buildGreatest( $fields, $values ); } public function buildLeast( $fields, $values ) { return $this->platform->buildLeast( $fields, $values ); } public function buildSubstring( $input, $startPosition, $length = null ) { return $this->platform->buildSubstring( $input, $startPosition, $length ); } public function buildStringCast( $field ) { return $this->platform->buildStringCast( $field ); } public function buildIntegerCast( $field ) { return $this->platform->buildIntegerCast( $field ); } public function tableName( string $name, $format = 'quoted' ) { return $this->platform->tableName( $name, $format ); } public function tableNamesN( ...$tables ) { return $this->platform->tableNamesN( ...$tables ); } public function addIdentifierQuotes( $s ) { return $this->platform->addIdentifierQuotes( $s ); } public function isQuotedIdentifier( $name ) { return $this->platform->isQuotedIdentifier( $name ); } public function buildLike( $param, ...$params ) { return $this->platform->buildLike( $param, ...$params ); } public function anyChar() { return $this->platform->anyChar(); } public function anyString() { return $this->platform->anyString(); } public function limitResult( $sql, $limit, $offset = false ) { return $this->platform->limitResult( $sql, $limit, $offset ); } public function unionSupportsOrderAndLimit() { return $this->platform->unionSupportsOrderAndLimit(); } public function unionQueries( $sqls, $all, $options = [] ) { return $this->platform->unionQueries( $sqls, $all, $options ); } public function conditional( $cond, $caseTrueExpression, $caseFalseExpression ) { return $this->platform->conditional( $cond, $caseTrueExpression, $caseFalseExpression ); } public function strreplace( $orig, $old, $new ) { return $this->platform->strreplace( $orig, $old, $new ); } public function timestamp( $ts = 0 ) { return $this->platform->timestamp( $ts ); } public function timestampOrNull( $ts = null ) { return $this->platform->timestampOrNull( $ts ); } public function getInfinity() { return $this->platform->getInfinity(); } public function encodeExpiry( $expiry ) { return $this->platform->encodeExpiry( $expiry ); } public function decodeExpiry( $expiry, $format = TS_MW ) { return $this->platform->decodeExpiry( $expiry, $format ); } public function setTableAliases( array $aliases ) { $this->platform->setTableAliases( $aliases ); } public function getTableAliases() { return $this->platform->getTableAliases(); } public function setIndexAliases( array $aliases ) { $this->platform->setIndexAliases( $aliases ); } public function buildGroupConcatField( $delim, $tables, $field, $conds = '', $join_conds = [] ) { return $this->platform->buildGroupConcatField( $delim, $tables, $field, $conds, $join_conds ); } public function buildSelectSubquery( $tables, $vars, $conds = '', $fname = __METHOD__, $options = [], $join_conds = [] ) { return $this->platform->buildSelectSubquery( $tables, $vars, $conds, $fname, $options, $join_conds ); } public function buildExcludedValue( $column ) { return $this->platform->buildExcludedValue( $column ); } public function setSchemaVars( $vars ) { $this->platform->setSchemaVars( $vars ); } /* End of methods delegated to SQLPlatform. */ /* Start of methods delegated to ReplicationReporter. */ public function primaryPosWait( DBPrimaryPos $pos, $timeout ) { return $this->replicationReporter->primaryPosWait( $this, $pos, $timeout ); } public function getPrimaryPos() { return $this->replicationReporter->getPrimaryPos( $this ); } public function getLag() { return $this->replicationReporter->getLag( $this ); } public function getSessionLagStatus() { return $this->replicationReporter->getSessionLagStatus( $this ); } /* End of methods delegated to ReplicationReporter. */ }