diff options
author | Aaron Schulz <aschulz@wikimedia.org> | 2020-03-02 14:20:09 -0800 |
---|---|---|
committer | Krinkle <krinklemail@gmail.com> | 2020-03-10 22:26:04 +0000 |
commit | 13b11a946ea4c5f98d607c0f974313ac98d039d2 (patch) | |
tree | d7aaf6dd09e1c2717d2cfaa1b52942cc769ce9c2 | |
parent | 0da37edd0baf309b5eb4ae379580098c2e81a552 (diff) | |
download | mediawikicore-13b11a946ea4c5f98d607c0f974313ac98d039d2.tar.gz mediawikicore-13b11a946ea4c5f98d607c0f974313ac98d039d2.zip |
rdbms: reduce duplication in Database via helper methods
Add several new internal methods to help with wrangling
the various formats that rows, conditions, options, and
unique key lists can come in. Remove now unused method
isMultiRowArray().
Add various sanity checks and logging for parameters to
upsert(), replace(), insert(), and insertSelect().
Move DatabasePostgresTest to the integration/ directory.
Change-Id: If5988a6f0816e8da2cbf2fd612e1a3e3a2e9c52f
-rw-r--r-- | includes/libs/rdbms/database/Database.php | 557 | ||||
-rw-r--r-- | includes/libs/rdbms/database/DatabaseMysqlBase.php | 38 | ||||
-rw-r--r-- | includes/libs/rdbms/database/DatabasePostgres.php | 128 | ||||
-rw-r--r-- | includes/libs/rdbms/database/DatabaseSqlite.php | 17 | ||||
-rw-r--r-- | tests/phpunit/includes/db/DatabaseTestHelper.php | 16 | ||||
-rw-r--r-- | tests/phpunit/includes/libs/rdbms/database/DatabaseSQLTest.php | 80 | ||||
-rw-r--r-- | tests/phpunit/integration/includes/db/DatabasePostgresTest.php (renamed from tests/phpunit/includes/db/DatabasePostgresTest.php) | 4 | ||||
-rw-r--r-- | tests/phpunit/integration/includes/db/DatabaseSqliteTest.php | 10 |
8 files changed, 489 insertions, 361 deletions
diff --git a/includes/libs/rdbms/database/Database.php b/includes/libs/rdbms/database/Database.php index e1bbd50bb3a4..be1eee6d547b 100644 --- a/includes/libs/rdbms/database/Database.php +++ b/includes/libs/rdbms/database/Database.php @@ -1653,10 +1653,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware throw new DBUnexpectedError( $this, "Cannot use a * field: got '$var'" ); } - if ( !is_array( $options ) ) { - $options = [ $options ]; - } - + $options = $this->normalizeOptions( $options ); $options['LIMIT'] = 1; $res = $this->select( $table, $var, $cond, $fname, $options, $join_conds ); @@ -1681,10 +1678,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware throw new DBUnexpectedError( $this, "Cannot use an array of fields" ); } - if ( !is_array( $options ) ) { - $options = [ $options ]; - } - + $options = $this->normalizeOptions( $options ); $res = $this->select( $table, [ 'value' => $var ], $cond, $fname, $options, $join_conds ); if ( $res === false ) { throw new DBUnexpectedError( $this, "Got false from select()" ); @@ -2020,9 +2014,35 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware } /** + * @param array $rowOrRows A single (field => value) map or a list of such maps + * @return array[] List of (field => value) maps + * @since 1.35 + */ + final protected function normalizeRowArray( array $rowOrRows ) { + if ( !$rowOrRows ) { + $rows = []; + } elseif ( isset( $rowOrRows[0] ) ) { + $rows = $rowOrRows; + } else { + $rows = [ $rowOrRows ]; + } + + foreach ( $rows as $row ) { + if ( !is_array( $row ) ) { + throw new DBUnexpectedError( $this, "Got non-array in row array" ); + } elseif ( !$row ) { + throw new DBUnexpectedError( $this, "Got empty array in row array" ); + } + } + + return $rows; + } + + /** * @param array|string $conds * @param string $fname * @return array + * @since 1.31 */ final protected function normalizeConditions( $conds, $fname ) { if ( $conds === null || $conds === false ) { @@ -2032,20 +2052,100 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware . $fname . ' with incorrect parameters: $conds must be a string or an array' ); - $conds = ''; + return []; + } elseif ( $conds === '' ) { + return []; + } + + return is_array( $conds ) ? $conds : [ $conds ]; + } + + /** + * @param string|string[]|string[][] $uniqueKeys Unique indexes (first is identity key) + * @return string[][] Unique indexes as column lists (first index is the identity key) + * @since 1.35 + */ + final protected function normalizeUpsertKeys( $uniqueKeys ) { + if ( is_string( $uniqueKeys ) ) { + return [ [ $uniqueKeys ] ]; + } + + if ( !is_array( $uniqueKeys ) || !$uniqueKeys ) { + throw new DBUnexpectedError( $this, 'Invalid or empty unique key array' ); + } + + $oldStyle = false; + $uniqueColumnSets = []; + foreach ( $uniqueKeys as $i => $uniqueKey ) { + if ( !is_int( $i ) ) { + throw new DBUnexpectedError( $this, 'Unique key array should be a list' ); + } elseif ( is_string( $uniqueKey ) ) { + $oldStyle = true; + $uniqueColumnSets[] = [ $uniqueKey ]; + } elseif ( is_array( $uniqueKey ) && $uniqueKey ) { + $uniqueColumnSets[] = $uniqueKey; + } else { + throw new DBUnexpectedError( $this, 'Invalid unique key array entry' ); + } + } + + if ( count( $uniqueColumnSets ) > 1 ) { + // If an existing row conflicts with new row X on key A and new row Y on key B, + // it is not well defined how many UPDATEs should apply to the existing row and + // in what order the new rows are checked + $this->queryLogger->warning( + __METHOD__ . " called with multiple unique keys", + [ 'exception' => new RuntimeException() ] + ); + } + + if ( $oldStyle ) { + // Passing a list of strings for single-column unique keys is too + // easily confused with passing the columns of composite unique key + $this->queryLogger->warning( + __METHOD__ . " called with deprecated parameter style: " . + "the unique key array should be a string or array of string arrays", + [ 'exception' => new RuntimeException() ] + ); + } + + return $uniqueColumnSets; + } + + /** + * @param string|array $options + * @return array Combination option/value map and boolean option list + * @since 1.35 + */ + final protected function normalizeOptions( $options ) { + if ( is_array( $options ) ) { + return $options; + } elseif ( is_string( $options ) ) { + return ( $options === '' ) ? [] : [ $options ]; + } else { + throw new DBUnexpectedError( $this, __METHOD__ . ': expected string or array' ); } + } - if ( !is_array( $conds ) ) { - $conds = ( $conds === '' ) ? [] : [ $conds ]; + /** + * @param string $option Query option flag (e.g. "IGNORE" or "FOR UPDATE") + * @param array $options Combination option/value map and boolean option list + * @return bool Whether the option appears as an integer-keyed value in the options + * @since 1.35 + */ + final protected function isFlagInOptions( $option, array $options ) { + foreach ( array_keys( $options, $option, true ) as $k ) { + if ( is_int( $k ) ) { + return true; + } } - return $conds; + return false; } /** * @param array|string $var Field parameter in the style of select() * @return string|null Column name or null; ignores aliases - * @throws DBUnexpectedError Errors out if multiple columns are given */ final protected function extractSingleFieldFromList( $var ) { if ( is_array( $var ) ) { @@ -2110,65 +2210,100 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware return !$indexInfo[0]->Non_unique; } - /** - * Helper for Database::insert(). - * - * @param array $options - * @return string - */ - protected function makeInsertOptions( $options ) { - return implode( ' ', $options ); + public function insert( $table, $rows, $fname = __METHOD__, $options = [] ) { + $rows = $this->normalizeRowArray( $rows ); + if ( !$rows ) { + return true; + } + + $options = $this->normalizeOptions( $options ); + if ( $this->isFlagInOptions( 'IGNORE', $options ) ) { + $this->doInsertNonConflicting( $table, $rows, $fname ); + } else { + $this->doInsert( $table, $rows, $fname ); + } + + return true; } /** - * @param array $a A single (field => value) map or a list of such maps - * @return bool + * @see Database::insert() + * @param string $table + * @param array $rows Non-empty list of rows + * @param string $fname + * @since 1.35 */ - final protected function isMultiRowArray( array $a ) { - return ( isset( $a[0] ) && is_array( $a[0] ) ); + protected function doInsert( $table, array $rows, $fname ) { + $encTable = $this->tableName( $table ); + list( $sqlColumns, $sqlTuples ) = $this->makeInsertLists( $rows ); + + $sql = "INSERT INTO $encTable ($sqlColumns) VALUES $sqlTuples"; + + $this->query( $sql, $fname ); } - public function insert( $table, $rows, $fname = __METHOD__, $options = [] ) { - # No rows to insert, easy just return now - if ( !count( $rows ) ) { - return true; - } + /** + * @see Database::insert() + * @param string $table + * @param array $rows Non-empty list of rows + * @param string $fname + * @since 1.35 + */ + protected function doInsertNonConflicting( $table, array $rows, $fname ) { + $encTable = $this->tableName( $table ); + list( $sqlColumns, $sqlTuples ) = $this->makeInsertLists( $rows ); + list( $sqlVerb, $sqlOpts ) = $this->makeInsertNonConflictingVerbAndOptions(); - $table = $this->tableName( $table ); + $sql = rtrim( "$sqlVerb $encTable ($sqlColumns) VALUES $sqlTuples $sqlOpts" ); - if ( !is_array( $options ) ) { - $options = [ $options ]; - } + $this->query( $sql, $fname ); + } - $options = $this->makeInsertOptions( $options ); + /** + * @return string[] ("INSERT"-style SQL verb, "ON CONFLICT"-style clause or "") + * @since 1.35 + */ + protected function makeInsertNonConflictingVerbAndOptions() { + return [ 'INSERT IGNORE INTO', '' ]; + } - $multi = $this->isMultiRowArray( $rows ); - if ( $multi ) { - $keys = array_keys( $rows[0] ); - } else { - $keys = array_keys( $rows ); + /** + * Make SQL lists of columns, row tuples for INSERT/VALUES expressions + * + * The tuple column order is that of the columns of the first provided row. + * The provided rows must have exactly the same keys and ordering thereof. + * + * @param array[] $rows Non-empty list of (column => value) maps + * @return array (comma-separated columns, comma-separated tuples) + * @since 1.35 + */ + protected function makeInsertLists( array $rows ) { + $firstRow = $rows[0]; + if ( !is_array( $firstRow ) || !$firstRow ) { + throw new DBUnexpectedError( $this, 'Got an empty row list or empty row' ); } + // List of columns that define the value tuple ordering + $tupleColumns = array_keys( $firstRow ); - $sql = 'INSERT ' . $options . - " INTO $table (" . implode( ',', $keys ) . ') VALUES '; - - if ( $multi ) { - $first = true; - foreach ( $rows as $row ) { - if ( $first ) { - $first = false; - } else { - $sql .= ','; - } - $sql .= '(' . $this->makeList( $row ) . ')'; + $valueTuples = []; + foreach ( $rows as $row ) { + $rowColumns = array_keys( $row ); + // VALUES(...) requires a uniform correspondance of (column => value) + if ( $rowColumns !== $tupleColumns ) { + throw new DBUnexpectedError( + $this, + 'Got row columns (' . implode( ', ', $rowColumns ) . ') ' . + 'instead of expected (' . implode( ', ', $tupleColumns ) . ')' + ); } - } else { - $sql .= '(' . $this->makeList( $rows ) . ')'; + // Make the value tuple that defines this row + $valueTuples[] = '(' . $this->makeList( $row, self::LIST_COMMA ) . ')'; } - $this->query( $sql, $fname ); - - return true; + return [ + $this->makeList( $tupleColumns, self::LIST_NAMES ), + implode( ',', $valueTuples ) + ]; } /** @@ -2178,9 +2313,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware * @return array */ protected function makeUpdateOptionsArray( $options ) { - if ( !is_array( $options ) ) { - $options = [ $options ]; - } + $options = $this->normalizeOptions( $options ); $opts = []; @@ -2894,137 +3027,164 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware } public function replace( $table, $uniqueKeys, $rows, $fname = __METHOD__ ) { - if ( count( $rows ) == 0 ) { + $rows = $this->normalizeRowArray( $rows ); + if ( !$rows ) { return; } - $uniqueKeys = (array)$uniqueKeys; - // Single row case - if ( !is_array( reset( $rows ) ) ) { - $rows = [ $rows ]; + if ( $uniqueKeys ) { + $uniqueKeys = $this->normalizeUpsertKeys( $uniqueKeys ); + $this->doReplace( $table, $uniqueKeys, $rows, $fname ); + } else { + $this->queryLogger->warning( + __METHOD__ . " called with no unique keys", + [ 'exception' => new RuntimeException() ] + ); + $this->doInsert( $table, $rows, $fname ); } + } + /** + * @see Database::replace() + * @param string $table + * @param string[][] $uniqueKeys Non-empty list of unique keys + * @param array $rows Non-empty list of rows + * @param string $fname + * @since 1.35 + */ + protected function doReplace( $table, array $uniqueKeys, array $rows, $fname ) { + $affectedRowCount = 0; + $this->startAtomic( $fname, self::ATOMIC_CANCELABLE ); try { - $this->startAtomic( $fname, self::ATOMIC_CANCELABLE ); - $affectedRowCount = 0; foreach ( $rows as $row ) { - // Delete rows which collide with this one - $indexWhereClauses = []; - foreach ( $uniqueKeys as $index ) { - $indexColumns = (array)$index; - $indexRowValues = array_intersect_key( $row, array_flip( $indexColumns ) ); - if ( count( $indexRowValues ) != count( $indexColumns ) ) { - throw new DBUnexpectedError( - $this, - 'New record does not provide all values for unique key (' . - implode( ', ', $indexColumns ) . ')' - ); - } elseif ( in_array( null, $indexRowValues, true ) ) { - throw new DBUnexpectedError( - $this, - 'New record has a null value for unique key (' . - implode( ', ', $indexColumns ) . ')' - ); - } - $indexWhereClauses[] = $this->makeList( $indexRowValues, LIST_AND ); - } - - if ( $indexWhereClauses ) { - $this->delete( $table, $this->makeList( $indexWhereClauses, LIST_OR ), $fname ); - $affectedRowCount += $this->affectedRows(); - } - + // Delete any conflicting rows (including ones inserted from $rows) + $sqlCondition = $this->makeConditionCollidesUponKeys( [ $row ], $uniqueKeys ); + $this->delete( $table, [ $sqlCondition ], $fname ); + $affectedRowCount += $this->affectedRows(); // Now insert the row $this->insert( $table, $row, $fname ); $affectedRowCount += $this->affectedRows(); } $this->endAtomic( $fname ); - $this->affectedRowCount = $affectedRowCount; } catch ( Exception $e ) { $this->cancelAtomic( $fname ); throw $e; } + $this->affectedRowCount = $affectedRowCount; } /** - * REPLACE query wrapper for MySQL and SQLite, which have a native REPLACE - * statement. - * - * @param string $table Table name - * @param array|string $rows Row(s) to insert - * @param string $fname Caller function name + * @param array[] $rows Non-empty list of rows + * @param string[] $uniqueKey List of columns that define a single unique index + * @return string SQL conditions to filter existing rows to those with counterparts in $rows */ - protected function nativeReplace( $table, $rows, $fname ) { - $table = $this->tableName( $table ); + private function makeConditionCollidesUponKey( array $rows, array $uniqueKey ) { + if ( !$rows ) { + throw new DBUnexpectedError( $this, "Empty row array" ); + } elseif ( !$uniqueKey ) { + throw new DBUnexpectedError( $this, "Empty unique key array" ); + } + + if ( count( $uniqueKey ) == 1 ) { + // Use a simple IN(...) clause + $column = reset( $uniqueKey ); + $values = array_column( $rows, $column ); + if ( count( $values ) !== count( $rows ) ) { + throw new DBUnexpectedError( $this, "Missing values for unique key ($column)" ); + } - # Single row case - if ( !is_array( reset( $rows ) ) ) { - $rows = [ $rows ]; + return $this->makeList( [ $column => $values ], self::LIST_AND ); } - $sql = "REPLACE INTO $table (" . implode( ',', array_keys( $rows[0] ) ) . ') VALUES '; - $first = true; - + $disjunctions = []; foreach ( $rows as $row ) { - if ( $first ) { - $first = false; - } else { - $sql .= ','; + $rowKeyMap = array_intersect_key( $row, array_flip( $uniqueKey ) ); + if ( count( $rowKeyMap ) != count( $uniqueKey ) ) { + throw new DBUnexpectedError( + $this, + "Missing values for unique key (" . implode( ',', $uniqueKey ) . ")" + ); } - - $sql .= '(' . $this->makeList( $row ) . ')'; + $disjunctions[] = $this->makeList( $rowKeyMap, self::LIST_AND ); } - $this->query( $sql, $fname ); + return count( $disjunctions ) > 1 + ? $this->makeList( $disjunctions, self::LIST_OR ) + : $disjunctions[0]; } - public function upsert( $table, array $rows, $uniqueKeys, array $set, $fname = __METHOD__ ) { - if ( $rows === [] ) { - return true; // nothing to do + /** + * @param array[] $rows Non-empty list of rows + * @param string[][] $uniqueKeys List of column lists that each define a unique index + * @return string SQL conditions to filter existing rows to those with counterparts in $rows + * @since 1.35 + */ + final protected function makeConditionCollidesUponKeys( array $rows, array $uniqueKeys ) { + if ( !$uniqueKeys ) { + throw new DBUnexpectedError( $this, "Empty unique key array" ); } - $uniqueKeys = (array)$uniqueKeys; - if ( !is_array( reset( $rows ) ) ) { - $rows = [ $rows ]; + $disjunctions = []; + foreach ( $uniqueKeys as $uniqueKey ) { + $disjunctions[] = $this->makeConditionCollidesUponKey( $rows, $uniqueKey ); } - '@phan-var array[] $rows'; - if ( count( $uniqueKeys ) ) { - $clauses = []; // list WHERE clauses that each identify a single row - foreach ( $rows as $row ) { - foreach ( $uniqueKeys as $index ) { - $index = is_array( $index ) ? $index : [ $index ]; // columns - $rowKey = []; // unique key to this row - foreach ( $index as $column ) { - $rowKey[$column] = $row[$column]; - } - $clauses[] = $this->makeList( $rowKey, self::LIST_AND ); - } - } - $where = [ $this->makeList( $clauses, self::LIST_OR ) ]; + return count( $disjunctions ) > 1 + ? $this->makeList( $disjunctions, self::LIST_OR ) + : $disjunctions[0]; + } + + public function upsert( $table, array $rows, $uniqueKeys, array $set, $fname = __METHOD__ ) { + $rows = $this->normalizeRowArray( $rows ); + if ( !$rows ) { + return true; + } + + if ( $uniqueKeys ) { + $uniqueKeys = $this->normalizeUpsertKeys( $uniqueKeys ); + $this->doUpsert( $table, $rows, $uniqueKeys, $set, $fname ); } else { - $where = false; + $this->queryLogger->warning( + __METHOD__ . " called with no unique keys", + [ 'exception' => new RuntimeException() ] + ); + $this->doInsert( $table, $rows, $fname ); } + return true; + } + + /** + * @see Database::upsert() + * @param string $table + * @param array[] $rows Non-empty list of rows + * @param string[][] $uniqueKeys Non-empty list of unique keys + * @param array $set + * @param string $fname + * @since 1.35 + */ + protected function doUpsert( $table, array $rows, array $uniqueKeys, array $set, $fname ) { $affectedRowCount = 0; + $this->startAtomic( $fname, self::ATOMIC_CANCELABLE ); try { - $this->startAtomic( $fname, self::ATOMIC_CANCELABLE ); - # Update any existing conflicting row(s) - if ( $where !== false ) { - $this->update( $table, $set, $where, $fname ); - $affectedRowCount += $this->affectedRows(); + foreach ( $rows as $row ) { + // Update any existing conflicting rows (including ones inserted from $rows) + $sqlConditions = $this->makeConditionCollidesUponKeys( [ $row ], $uniqueKeys ); + $this->update( $table, $set, [ $sqlConditions ], $fname ); + $rowsUpdated = $this->affectedRows(); + $affectedRowCount += $rowsUpdated; + if ( $rowsUpdated <= 0 ) { + // Now insert the row if there are no conflicts + $this->insert( $table, $row, $fname ); + $affectedRowCount += $this->affectedRows(); + } } - # Now insert any non-conflicting row(s) - $this->insert( $table, $rows, $fname, [ 'IGNORE' ] ); - $affectedRowCount += $this->affectedRows(); $this->endAtomic( $fname ); - $this->affectedRowCount = $affectedRowCount; } catch ( Exception $e ) { $this->cancelAtomic( $fname ); throw $e; } - - return true; + $this->affectedRowCount = $affectedRowCount; } public function deleteJoin( $delTable, $joinTable, $delVar, $joinVar, $conds, @@ -3083,18 +3243,24 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware } final public function insertSelect( - $destTable, $srcTable, $varMap, $conds, - $fname = __METHOD__, $insertOptions = [], $selectOptions = [], $selectJoinConds = [] + $destTable, + $srcTable, + $varMap, + $conds, + $fname = __METHOD__, + $insertOptions = [], + $selectOptions = [], + $selectJoinConds = [] ) { static $hints = [ 'NO_AUTO_COLUMNS' ]; - $insertOptions = (array)$insertOptions; - $selectOptions = (array)$selectOptions; + $insertOptions = $this->normalizeOptions( $insertOptions ); + $selectOptions = $this->normalizeOptions( $selectOptions ); if ( $this->cliMode && $this->isInsertSelectSafe( $insertOptions, $selectOptions ) ) { // For massive migrations with downtime, we don't want to select everything // into memory and OOM, so do all this native on the server side if possible. - $this->nativeInsertSelect( + $this->doInsertSelectNative( $destTable, $srcTable, $varMap, @@ -3105,7 +3271,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware $selectJoinConds ); } else { - $this->nonNativeInsertSelect( + $this->doInsertSelectGeneric( $destTable, $srcTable, $varMap, @@ -3134,7 +3300,6 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware * Implementation of insertSelect() based on select() and insert() * * @see IDatabase::insertSelect() - * @since 1.30 * @param string $destTable * @param string|array $srcTable * @param array $varMap @@ -3143,10 +3308,17 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware * @param array $insertOptions * @param array $selectOptions * @param array $selectJoinConds + * @since 1.35 */ - protected function nonNativeInsertSelect( $destTable, $srcTable, $varMap, $conds, - $fname = __METHOD__, - $insertOptions = [], $selectOptions = [], $selectJoinConds = [] + protected function doInsertSelectGeneric( + $destTable, + $srcTable, + array $varMap, + $conds, + $fname, + array $insertOptions, + array $selectOptions, + $selectJoinConds ) { // For web requests, do a locking SELECT and then INSERT. This puts the SELECT burden // on only the master (without needing row-based-replication). It also makes it easy to @@ -3155,48 +3327,37 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware foreach ( $varMap as $dstColumn => $sourceColumnOrSql ) { $fields[] = $this->fieldNameWithAlias( $sourceColumnOrSql, $dstColumn ); } - $selectOptions[] = 'FOR UPDATE'; $res = $this->select( - $srcTable, implode( ',', $fields ), $conds, $fname, $selectOptions, $selectJoinConds + $srcTable, + implode( ',', $fields ), + $conds, + $fname, + array_merge( $selectOptions, [ 'FOR UPDATE' ] ), + $selectJoinConds ); if ( !$res ) { return; } + $affectedRowCount = 0; + $this->startAtomic( $fname, self::ATOMIC_CANCELABLE ); try { - $affectedRowCount = 0; - $this->startAtomic( $fname, self::ATOMIC_CANCELABLE ); $rows = []; - $ok = true; foreach ( $res as $row ) { $rows[] = (array)$row; - - // Avoid inserts that are too huge - if ( count( $rows ) >= $this->nonNativeInsertSelectBatchSize ) { - $ok = $this->insert( $destTable, $rows, $fname, $insertOptions ); - if ( !$ok ) { - break; - } - $affectedRowCount += $this->affectedRows(); - $rows = []; - } } - if ( $rows && $ok ) { - $ok = $this->insert( $destTable, $rows, $fname, $insertOptions ); - if ( $ok ) { - $affectedRowCount += $this->affectedRows(); - } - } - if ( $ok ) { - $this->endAtomic( $fname ); - $this->affectedRowCount = $affectedRowCount; - } else { - $this->cancelAtomic( $fname ); + // Avoid inserts that are too huge + $rowBatches = array_chunk( $rows, $this->nonNativeInsertSelectBatchSize ); + foreach ( $rowBatches as $rows ) { + $this->insert( $destTable, $rows, $fname, $insertOptions ); + $affectedRowCount += $this->affectedRows(); } } catch ( Exception $e ) { $this->cancelAtomic( $fname ); throw $e; } + $this->endAtomic( $fname ); + $this->affectedRowCount = $affectedRowCount; } /** @@ -3212,19 +3373,23 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware * @param array $insertOptions * @param array $selectOptions * @param array $selectJoinConds + * @since 1.35 */ - protected function nativeInsertSelect( $destTable, $srcTable, $varMap, $conds, - $fname = __METHOD__, - $insertOptions = [], $selectOptions = [], $selectJoinConds = [] + protected function doInsertSelectNative( + $destTable, + $srcTable, + array $varMap, + $conds, + $fname, + array $insertOptions, + array $selectOptions, + $selectJoinConds ) { - $destTable = $this->tableName( $destTable ); - - if ( !is_array( $insertOptions ) ) { - $insertOptions = [ $insertOptions ]; - } - - $insertOptions = $this->makeInsertOptions( $insertOptions ); - + list( $sqlVerb, $sqlOpts ) = $this->isFlagInOptions( 'IGNORE', $insertOptions ) + ? $this->makeInsertNonConflictingVerbAndOptions() + : [ 'INSERT INTO', '' ]; + $encDstTable = $this->tableName( $destTable ); + $sqlDstColumns = implode( ',', array_keys( $varMap ) ); $selectSql = $this->selectSQLText( $srcTable, array_values( $varMap ), @@ -3234,9 +3399,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware $selectJoinConds ); - $sql = "INSERT $insertOptions" . - " INTO $destTable (" . implode( ',', array_keys( $varMap ) ) . ') ' . - $selectSql; + $sql = rtrim( "$sqlVerb $encDstTable ($sqlDstColumns) $selectSql $sqlOpts" ); $this->query( $sql, $fname ); } diff --git a/includes/libs/rdbms/database/DatabaseMysqlBase.php b/includes/libs/rdbms/database/DatabaseMysqlBase.php index 353d1a09bf3a..614a756ab4c8 100644 --- a/includes/libs/rdbms/database/DatabaseMysqlBase.php +++ b/includes/libs/rdbms/database/DatabaseMysqlBase.php @@ -448,10 +448,6 @@ abstract class DatabaseMysqlBase extends Database { return in_array( $errno, [ 2062, 3024 ] ); } - public function replace( $table, $uniqueKeys, $rows, $fname = __METHOD__ ) { - $this->nativeReplace( $table, $rows, $fname ); - } - protected function isInsertSelectSafe( array $insertOptions, array $selectOptions ) { $row = $this->getReplicationSafetyInfo(); // For row-based-replication, the resulting changes will be relayed, not the query @@ -1324,31 +1320,25 @@ abstract class DatabaseMysqlBase extends Database { $this->query( $sql, $fname ); } - public function upsert( - $table, array $rows, $uniqueKeys, array $set, $fname = __METHOD__ - ) { - if ( $rows === [] ) { - return true; // nothing to do - } + protected function doUpsert( $table, array $rows, array $uniqueKeys, array $set, $fname ) { + $encTable = $this->tableName( $table ); + list( $sqlColumns, $sqlTuples ) = $this->makeInsertLists( $rows ); + $sqlColumnAssignments = $this->makeList( $set, self::LIST_SET ); - if ( !is_array( reset( $rows ) ) ) { - $rows = [ $rows ]; - } + $sql = + "INSERT INTO $encTable ($sqlColumns) VALUES $sqlTuples " . + "ON DUPLICATE KEY UPDATE $sqlColumnAssignments"; - $table = $this->tableName( $table ); - $columns = array_keys( $rows[0] ); + $this->query( $sql, $fname ); + } - $sql = "INSERT INTO $table (" . implode( ',', $columns ) . ') VALUES '; - $rowTuples = []; - foreach ( $rows as $row ) { - $rowTuples[] = '(' . $this->makeList( $row ) . ')'; - } - $sql .= implode( ',', $rowTuples ); - $sql .= " ON DUPLICATE KEY UPDATE " . $this->makeList( $set, self::LIST_SET ); + protected function doReplace( $table, array $uniqueKeys, array $rows, $fname ) { + $encTable = $this->tableName( $table ); + list( $sqlColumns, $sqlTuples ) = $this->makeInsertLists( $rows ); - $this->query( $sql, $fname ); + $sql = "REPLACE INTO $encTable ($sqlColumns) VALUES $sqlTuples"; - return true; + $this->query( $sql, $fname ); } /** diff --git a/includes/libs/rdbms/database/DatabasePostgres.php b/includes/libs/rdbms/database/DatabasePostgres.php index c16ab403e02f..66d71eabeb7d 100644 --- a/includes/libs/rdbms/database/DatabasePostgres.php +++ b/includes/libs/rdbms/database/DatabasePostgres.php @@ -542,93 +542,53 @@ __INDEXATTR__; return parent::selectSQLText( $table, $vars, $conds, $fname, $options, $join_conds ); } - /** @inheritDoc */ - public function insert( $table, $args, $fname = __METHOD__, $options = [] ) { - if ( !count( $args ) ) { - return true; - } + protected function makeInsertNonConflictingVerbAndOptions() { + return [ 'INSERT INTO', 'ON CONFLICT DO NOTHING' ]; + } - $table = $this->tableName( $table ); - if ( !isset( $this->numericVersion ) ) { - $this->getServerVersion(); - } + public function doInsertNonConflicting( $table, array $rows, $fname ) { + // Postgres 9.5 supports "ON CONFLICT" + if ( $this->getServerVersion() >= 9.5 ) { + parent::doInsertNonConflicting( $table, $rows, $fname ); - if ( !is_array( $options ) ) { - $options = [ $options ]; + return; } - if ( isset( $args[0] ) && is_array( $args[0] ) ) { - $rows = $args; - $keys = array_keys( $args[0] ); - } else { - $rows = [ $args ]; - $keys = array_keys( $args ); - } - - $ignore = in_array( 'IGNORE', $options ); - - $sql = "INSERT INTO $table (" . implode( ',', $keys ) . ') VALUES '; - - if ( $this->numericVersion >= 9.5 || !$ignore ) { - // No IGNORE or our PG has "ON CONFLICT DO NOTHING" - $first = true; + $affectedRowCount = 0; + // Emulate INSERT IGNORE via savepoints/rollback + $tok = $this->startAtomic( "$fname (outer)", self::ATOMIC_CANCELABLE ); + try { + $encTable = $this->tableName( $table ); foreach ( $rows as $row ) { - if ( $first ) { - $first = false; - } else { - $sql .= ','; - } - $sql .= '(' . $this->makeList( $row ) . ')'; - } - if ( $ignore ) { - $sql .= ' ON CONFLICT DO NOTHING'; - } - $this->query( $sql, $fname ); - } else { - // Emulate IGNORE by doing each row individually, with savepoints - // to roll back as necessary. - $numrowsinserted = 0; - - $tok = $this->startAtomic( "$fname (outer)", self::ATOMIC_CANCELABLE ); - try { - foreach ( $rows as $row ) { - $tempsql = $sql; - $tempsql .= '(' . $this->makeList( $row ) . ')'; - - $this->startAtomic( "$fname (inner)", self::ATOMIC_CANCELABLE ); - try { - $this->query( $tempsql, $fname ); - $this->endAtomic( "$fname (inner)" ); - $numrowsinserted++; - } catch ( DBQueryError $e ) { - $this->cancelAtomic( "$fname (inner)" ); - // Our IGNORE is supposed to ignore duplicate key errors, but not others. - // (even though MySQL's version apparently ignores all errors) - if ( $e->errno !== '23505' ) { - throw $e; - } + list( $sqlColumns, $sqlTuples ) = $this->makeInsertLists( [ $row ] ); + $tempsql = "INSERT INTO $encTable ($sqlColumns) VALUES ($sqlTuples)"; + + $this->startAtomic( "$fname (inner)", self::ATOMIC_CANCELABLE ); + try { + $this->query( $tempsql, $fname ); + $this->endAtomic( "$fname (inner)" ); + $affectedRowCount++; + } catch ( DBQueryError $e ) { + $this->cancelAtomic( "$fname (inner)" ); + // Our IGNORE is supposed to ignore duplicate key errors, but not others. + // (even though MySQL's version apparently ignores all errors) + if ( $e->errno !== '23505' ) { + throw $e; } } - } catch ( Exception $e ) { - $this->cancelAtomic( "$fname (outer)", $tok ); - throw $e; } - $this->endAtomic( "$fname (outer)" ); - - // Set the affected row count for the whole operation - $this->affectedRowCount = $numrowsinserted; + } catch ( Exception $e ) { + $this->cancelAtomic( "$fname (outer)", $tok ); + throw $e; } - - return true; + $this->endAtomic( "$fname (outer)" ); + // Set the affected row count for the whole operation + $this->affectedRowCount = $affectedRowCount; } protected function makeUpdateOptionsArray( $options ) { - if ( !is_array( $options ) ) { - $options = [ $options ]; - } - - // PostgreSQL doesn't support anything like "ignore" for - // UPDATE. + $options = $this->normalizeOptions( $options ); + // PostgreSQL doesn't support anything like "ignore" for UPDATE. $options = array_diff( $options, [ 'IGNORE' ] ); return parent::makeUpdateOptionsArray( $options ); @@ -652,9 +612,15 @@ __INDEXATTR__; * @param array $selectOptions * @param array $selectJoinConds */ - protected function nativeInsertSelect( - $destTable, $srcTable, $varMap, $conds, $fname = __METHOD__, - $insertOptions = [], $selectOptions = [], $selectJoinConds = [] + protected function doInsertSelectNative( + $destTable, + $srcTable, + array $varMap, + $conds, + $fname, + array $insertOptions, + array $selectOptions, + $selectJoinConds ) { if ( !is_array( $insertOptions ) ) { $insertOptions = [ $insertOptions ]; @@ -662,7 +628,7 @@ __INDEXATTR__; if ( in_array( 'IGNORE', $insertOptions ) ) { if ( $this->getServerVersion() >= 9.5 ) { - // Use ON CONFLICT DO NOTHING if we have it for IGNORE + // Use "ON CONFLICT DO" if we have it for IGNORE $destTable = $this->tableName( $destTable ); $selectSql = $this->selectSQLText( @@ -680,13 +646,13 @@ __INDEXATTR__; $this->query( $sql, $fname ); } else { // IGNORE and we don't have ON CONFLICT DO NOTHING, so just use the non-native version - $this->nonNativeInsertSelect( + $this->doInsertSelectGeneric( $destTable, $srcTable, $varMap, $conds, $fname, $insertOptions, $selectOptions, $selectJoinConds ); } } else { - parent::nativeInsertSelect( $destTable, $srcTable, $varMap, $conds, $fname, + parent::doInsertSelectNative( $destTable, $srcTable, $varMap, $conds, $fname, $insertOptions, $selectOptions, $selectJoinConds ); } } diff --git a/includes/libs/rdbms/database/DatabaseSqlite.php b/includes/libs/rdbms/database/DatabaseSqlite.php index bbf58570ec09..0671eb9af673 100644 --- a/includes/libs/rdbms/database/DatabaseSqlite.php +++ b/includes/libs/rdbms/database/DatabaseSqlite.php @@ -661,18 +661,15 @@ class DatabaseSqlite extends Database { return $options; } - /** - * @param array $options - * @return string - */ - protected function makeInsertOptions( $options ) { - $options = self::rewriteIgnoreKeyword( $options ); - - return parent::makeInsertOptions( $options ); + protected function makeInsertNonConflictingVerbAndOptions() { + return [ 'INSERT OR IGNORE INTO', '' ]; } - public function replace( $table, $uniqueKeys, $rows, $fname = __METHOD__ ) { - $this->nativeReplace( $table, $rows, $fname ); + protected function doReplace( $table, array $uniqueKeys, array $rows, $fname ) { + $encTable = $this->tableName( $table ); + list( $sqlColumns, $sqlTuples ) = $this->makeInsertLists( $rows ); + // https://sqlite.org/lang_insert.html + $this->query( "REPLACE INTO $encTable ($sqlColumns) VALUES $sqlTuples", $fname ); } /** diff --git a/tests/phpunit/includes/db/DatabaseTestHelper.php b/tests/phpunit/includes/db/DatabaseTestHelper.php index 08d3d9749a00..5952ef7f4608 100644 --- a/tests/phpunit/includes/db/DatabaseTestHelper.php +++ b/tests/phpunit/includes/db/DatabaseTestHelper.php @@ -43,6 +43,9 @@ class DatabaseTestHelper extends Database { */ protected $unionSupportsOrderAndLimit = true; + /** @var int[] */ + protected $forcedAffectedCountQueue = []; + public function __construct( $testName, array $opts = [] ) { parent::__construct( $opts + [ 'host' => null, @@ -166,11 +169,6 @@ class DatabaseTestHelper extends Database { return in_array( $table, (array)$this->tablesExists ); } - // Redeclare parent method to make it public - public function nativeReplace( $table, $rows, $fname ) { - parent::nativeReplace( $table, $rows, $fname ); - } - public function getType() { return 'test'; } @@ -254,6 +252,10 @@ class DatabaseTestHelper extends Database { return true; } + public function setNextQueryAffectedRowCounts( array $counts ) { + $this->forcedAffectedCountQueue = $counts; + } + protected function doQuery( $sql ) { $sql = preg_replace( '< /\* .+? \*/>', '', $sql ); $this->addSql( $sql ); @@ -268,6 +270,10 @@ class DatabaseTestHelper extends Database { $this->nextResult = []; $this->lastError = null; + if ( $this->forcedAffectedCountQueue ) { + $this->affectedRowCount = array_shift( $this->forcedAffectedCountQueue ); + } + return new FakeResultWrapper( $res ); } diff --git a/tests/phpunit/includes/libs/rdbms/database/DatabaseSQLTest.php b/tests/phpunit/includes/libs/rdbms/database/DatabaseSQLTest.php index 3e36676db658..076277c57a77 100644 --- a/tests/phpunit/includes/libs/rdbms/database/DatabaseSQLTest.php +++ b/tests/phpunit/includes/libs/rdbms/database/DatabaseSQLTest.php @@ -499,6 +499,8 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase { * @covers Wikimedia\Rdbms\Database::upsert */ public function testUpsert( $sql, $sqlText ) { + $this->database->setNextQueryAffectedRowCounts( [ 0 ] ); + $this->database->upsert( $sql['table'], $sql['rows'], @@ -515,18 +517,50 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase { [ 'table' => 'upsert_table', 'rows' => [ 'field' => 'text', 'field2' => 'text2' ], - 'uniqueIndexes' => [ 'field' ], + 'uniqueIndexes' => 'field', 'set' => [ 'field' => 'set' ], ], "BEGIN; " . "UPDATE upsert_table " . "SET field = 'set' " . - "WHERE ((field = 'text')); " . - "INSERT IGNORE INTO upsert_table " . + "WHERE (field = 'text'); " . + "INSERT INTO upsert_table " . "(field,field2) " . "VALUES ('text','text2'); " . "COMMIT" ], + [ + [ + 'table' => 'upsert_table', + 'rows' => [ 'field' => 'text', 'field2' => 'text2' ], + 'uniqueIndexes' => [ [ 'field' ] ], + 'set' => [ 'field' => 'set' ], + ], + "BEGIN; " . + "UPDATE upsert_table " . + "SET field = 'set' " . + "WHERE (field = 'text'); " . + "INSERT INTO upsert_table " . + "(field,field2) " . + "VALUES ('text','text2'); " . + "COMMIT" + ], + [ + [ + 'table' => 'upsert_table', + 'rows' => [ 'fieldA' => 'text', 'fieldB' => 'more', 'field2' => 'text2' ], + 'uniqueIndexes' => [ [ 'fieldA', 'fieldB' ] ], + 'set' => [ 'field2' => 'set' ], + ], + "BEGIN; " . + "UPDATE upsert_table " . + "SET field2 = 'set' " . + "WHERE (fieldA = 'text' AND fieldB = 'more'); " . + "INSERT INTO upsert_table " . + "(fieldA,fieldB,field2) " . + "VALUES ('text','more','text2'); " . + "COMMIT" + ], ]; } @@ -580,7 +614,6 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase { /** * @dataProvider provideInsert * @covers Wikimedia\Rdbms\Database::insert - * @covers Wikimedia\Rdbms\Database::makeInsertOptions */ public function testInsert( $sql, $sqlText ) { $this->database->insert( @@ -634,7 +667,7 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase { /** * @dataProvider provideInsertSelect * @covers Wikimedia\Rdbms\Database::insertSelect - * @covers Wikimedia\Rdbms\Database::nativeInsertSelect + * @covers Wikimedia\Rdbms\Database::doInsertSelectNative */ public function testInsertSelect( $sql, $sqlTextNative, $sqlSelect, $sqlInsert ) { $this->database->insertSelect( @@ -746,7 +779,7 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase { /** * @covers Wikimedia\Rdbms\Database::insertSelect - * @covers Wikimedia\Rdbms\Database::nativeInsertSelect + * @covers Wikimedia\Rdbms\Database::doInsertSelectNative */ public function testInsertSelectBatching() { $dbWeb = new DatabaseTestHelper( __CLASS__, [ 'cliMode' => false ] ); @@ -860,12 +893,12 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase { ], ], "BEGIN; DELETE FROM module_deps " . - "WHERE (md_module = 'module') OR (md_skin = 'skin'); " . + "WHERE ((md_module = 'module') OR (md_skin = 'skin')); " . "INSERT INTO module_deps " . "(md_module,md_skin,md_deps) " . "VALUES ('module','skin','deps'); " . "DELETE FROM module_deps " . - "WHERE (md_module = 'module2') OR (md_skin = 'skin2'); " . + "WHERE ((md_module = 'module2') OR (md_skin = 'skin2')); " . "INSERT INTO module_deps " . "(md_module,md_skin,md_deps) " . "VALUES ('module2','skin2','deps2'); COMMIT" @@ -880,36 +913,9 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase { 'md_deps' => 'deps', ], ], - "BEGIN; INSERT INTO module_deps " . + "INSERT INTO module_deps " . "(md_module,md_skin,md_deps) " . - "VALUES ('module','skin','deps'); COMMIT" - ], - ]; - } - - /** - * @dataProvider provideNativeReplace - * @covers Wikimedia\Rdbms\Database::nativeReplace - */ - public function testNativeReplace( $sql, $sqlText ) { - $this->database->nativeReplace( - $sql['table'], - $sql['rows'], - __METHOD__ - ); - $this->assertLastSql( $sqlText ); - } - - public static function provideNativeReplace() { - return [ - [ - [ - 'table' => 'replace_table', - 'rows' => [ 'field' => 'text', 'field2' => 'text2' ], - ], - "REPLACE INTO replace_table " . - "(field,field2) " . - "VALUES ('text','text2')" + "VALUES ('module','skin','deps')" ], ]; } diff --git a/tests/phpunit/includes/db/DatabasePostgresTest.php b/tests/phpunit/integration/includes/db/DatabasePostgresTest.php index 4a9be12b9bed..481118db065b 100644 --- a/tests/phpunit/includes/db/DatabasePostgresTest.php +++ b/tests/phpunit/integration/includes/db/DatabasePostgresTest.php @@ -145,7 +145,7 @@ class DatabasePostgresTest extends MediaWikiTestCase { } /** - * @covers Wikimedia\Rdbms\DatabasePostgres::nativeInsertSelect + * @covers Wikimedia\Rdbms\DatabasePostgres::doInsertSelectNative */ public function testInsertSelectIgnoreOld() { if ( !$this->db instanceof DatabasePostgres ) { @@ -167,7 +167,7 @@ class DatabasePostgresTest extends MediaWikiTestCase { } /** - * @covers Wikimedia\Rdbms\DatabasePostgres::nativeInsertSelect + * @covers Wikimedia\Rdbms\DatabasePostgres::doInsertSelectNative */ public function testInsertSelectIgnoreNew() { if ( !$this->db instanceof DatabasePostgres ) { diff --git a/tests/phpunit/integration/includes/db/DatabaseSqliteTest.php b/tests/phpunit/integration/includes/db/DatabaseSqliteTest.php index b7f2c11a93b6..d23b121abaa7 100644 --- a/tests/phpunit/integration/includes/db/DatabaseSqliteTest.php +++ b/tests/phpunit/integration/includes/db/DatabaseSqliteTest.php @@ -591,13 +591,13 @@ class DatabaseSqliteTest extends \MediaWikiIntegrationTestCase { '3.7.11', 'a', [ 'a_1' => 1 ], - 'INSERT INTO a (a_1) VALUES (1);' + 'INSERT INTO a (a_1) VALUES (1);' ], [ '3.7.10', 'a', [ 'a_1' => 1 ], - 'INSERT INTO a (a_1) VALUES (1);' + 'INSERT INTO a (a_1) VALUES (1);' ], [ '3.7.11', @@ -606,7 +606,7 @@ class DatabaseSqliteTest extends \MediaWikiIntegrationTestCase { [ 'a_1' => 2 ], [ 'a_1' => 3 ] ], - 'INSERT INTO a (a_1) VALUES (2),(3);' + 'INSERT INTO a (a_1) VALUES (2),(3);' ], [ '3.7.10', @@ -616,8 +616,8 @@ class DatabaseSqliteTest extends \MediaWikiIntegrationTestCase { [ 'a_1' => 3 ] ], 'BEGIN;' . - 'INSERT INTO a (a_1) VALUES (2);' . - 'INSERT INTO a (a_1) VALUES (3);' . + 'INSERT INTO a (a_1) VALUES (2);' . + 'INSERT INTO a (a_1) VALUES (3);' . 'COMMIT;' ] ]; |