diff options
-rw-r--r-- | includes/libs/rdbms/database/Database.php | 557 | ||||
-rw-r--r-- | includes/libs/rdbms/database/DatabaseMysqlBase.php | 38 | ||||
-rw-r--r-- | includes/libs/rdbms/database/DatabasePostgres.php | 128 | ||||
-rw-r--r-- | includes/libs/rdbms/database/DatabaseSqlite.php | 17 | ||||
-rw-r--r-- | tests/phpunit/includes/db/DatabaseTestHelper.php | 16 | ||||
-rw-r--r-- | tests/phpunit/includes/libs/rdbms/database/DatabaseSQLTest.php | 80 | ||||
-rw-r--r-- | tests/phpunit/integration/includes/db/DatabasePostgresTest.php (renamed from tests/phpunit/includes/db/DatabasePostgresTest.php) | 4 | ||||
-rw-r--r-- | tests/phpunit/integration/includes/db/DatabaseSqliteTest.php | 10 |
8 files changed, 489 insertions, 361 deletions
diff --git a/includes/libs/rdbms/database/Database.php b/includes/libs/rdbms/database/Database.php index e1bbd50bb3a4..be1eee6d547b 100644 --- a/includes/libs/rdbms/database/Database.php +++ b/includes/libs/rdbms/database/Database.php @@ -1653,10 +1653,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware throw new DBUnexpectedError( $this, "Cannot use a * field: got '$var'" ); } - if ( !is_array( $options ) ) { - $options = [ $options ]; - } - + $options = $this->normalizeOptions( $options ); $options['LIMIT'] = 1; $res = $this->select( $table, $var, $cond, $fname, $options, $join_conds ); @@ -1681,10 +1678,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware throw new DBUnexpectedError( $this, "Cannot use an array of fields" ); } - if ( !is_array( $options ) ) { - $options = [ $options ]; - } - + $options = $this->normalizeOptions( $options ); $res = $this->select( $table, [ 'value' => $var ], $cond, $fname, $options, $join_conds ); if ( $res === false ) { throw new DBUnexpectedError( $this, "Got false from select()" ); @@ -2020,9 +2014,35 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware } /** + * @param array $rowOrRows A single (field => value) map or a list of such maps + * @return array[] List of (field => value) maps + * @since 1.35 + */ + final protected function normalizeRowArray( array $rowOrRows ) { + if ( !$rowOrRows ) { + $rows = []; + } elseif ( isset( $rowOrRows[0] ) ) { + $rows = $rowOrRows; + } else { + $rows = [ $rowOrRows ]; + } + + foreach ( $rows as $row ) { + if ( !is_array( $row ) ) { + throw new DBUnexpectedError( $this, "Got non-array in row array" ); + } elseif ( !$row ) { + throw new DBUnexpectedError( $this, "Got empty array in row array" ); + } + } + + return $rows; + } + + /** * @param array|string $conds * @param string $fname * @return array + * @since 1.31 */ final protected function normalizeConditions( $conds, $fname ) { if ( $conds === null || $conds === false ) { @@ -2032,20 +2052,100 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware . $fname . ' with incorrect parameters: $conds must be a string or an array' ); - $conds = ''; + return []; + } elseif ( $conds === '' ) { + return []; + } + + return is_array( $conds ) ? $conds : [ $conds ]; + } + + /** + * @param string|string[]|string[][] $uniqueKeys Unique indexes (first is identity key) + * @return string[][] Unique indexes as column lists (first index is the identity key) + * @since 1.35 + */ + final protected function normalizeUpsertKeys( $uniqueKeys ) { + if ( is_string( $uniqueKeys ) ) { + return [ [ $uniqueKeys ] ]; + } + + if ( !is_array( $uniqueKeys ) || !$uniqueKeys ) { + throw new DBUnexpectedError( $this, 'Invalid or empty unique key array' ); + } + + $oldStyle = false; + $uniqueColumnSets = []; + foreach ( $uniqueKeys as $i => $uniqueKey ) { + if ( !is_int( $i ) ) { + throw new DBUnexpectedError( $this, 'Unique key array should be a list' ); + } elseif ( is_string( $uniqueKey ) ) { + $oldStyle = true; + $uniqueColumnSets[] = [ $uniqueKey ]; + } elseif ( is_array( $uniqueKey ) && $uniqueKey ) { + $uniqueColumnSets[] = $uniqueKey; + } else { + throw new DBUnexpectedError( $this, 'Invalid unique key array entry' ); + } + } + + if ( count( $uniqueColumnSets ) > 1 ) { + // If an existing row conflicts with new row X on key A and new row Y on key B, + // it is not well defined how many UPDATEs should apply to the existing row and + // in what order the new rows are checked + $this->queryLogger->warning( + __METHOD__ . " called with multiple unique keys", + [ 'exception' => new RuntimeException() ] + ); + } + + if ( $oldStyle ) { + // Passing a list of strings for single-column unique keys is too + // easily confused with passing the columns of composite unique key + $this->queryLogger->warning( + __METHOD__ . " called with deprecated parameter style: " . + "the unique key array should be a string or array of string arrays", + [ 'exception' => new RuntimeException() ] + ); + } + + return $uniqueColumnSets; + } + + /** + * @param string|array $options + * @return array Combination option/value map and boolean option list + * @since 1.35 + */ + final protected function normalizeOptions( $options ) { + if ( is_array( $options ) ) { + return $options; + } elseif ( is_string( $options ) ) { + return ( $options === '' ) ? [] : [ $options ]; + } else { + throw new DBUnexpectedError( $this, __METHOD__ . ': expected string or array' ); } + } - if ( !is_array( $conds ) ) { - $conds = ( $conds === '' ) ? [] : [ $conds ]; + /** + * @param string $option Query option flag (e.g. "IGNORE" or "FOR UPDATE") + * @param array $options Combination option/value map and boolean option list + * @return bool Whether the option appears as an integer-keyed value in the options + * @since 1.35 + */ + final protected function isFlagInOptions( $option, array $options ) { + foreach ( array_keys( $options, $option, true ) as $k ) { + if ( is_int( $k ) ) { + return true; + } } - return $conds; + return false; } /** * @param array|string $var Field parameter in the style of select() * @return string|null Column name or null; ignores aliases - * @throws DBUnexpectedError Errors out if multiple columns are given */ final protected function extractSingleFieldFromList( $var ) { if ( is_array( $var ) ) { @@ -2110,65 +2210,100 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware return !$indexInfo[0]->Non_unique; } - /** - * Helper for Database::insert(). - * - * @param array $options - * @return string - */ - protected function makeInsertOptions( $options ) { - return implode( ' ', $options ); + public function insert( $table, $rows, $fname = __METHOD__, $options = [] ) { + $rows = $this->normalizeRowArray( $rows ); + if ( !$rows ) { + return true; + } + + $options = $this->normalizeOptions( $options ); + if ( $this->isFlagInOptions( 'IGNORE', $options ) ) { + $this->doInsertNonConflicting( $table, $rows, $fname ); + } else { + $this->doInsert( $table, $rows, $fname ); + } + + return true; } /** - * @param array $a A single (field => value) map or a list of such maps - * @return bool + * @see Database::insert() + * @param string $table + * @param array $rows Non-empty list of rows + * @param string $fname + * @since 1.35 */ - final protected function isMultiRowArray( array $a ) { - return ( isset( $a[0] ) && is_array( $a[0] ) ); + protected function doInsert( $table, array $rows, $fname ) { + $encTable = $this->tableName( $table ); + list( $sqlColumns, $sqlTuples ) = $this->makeInsertLists( $rows ); + + $sql = "INSERT INTO $encTable ($sqlColumns) VALUES $sqlTuples"; + + $this->query( $sql, $fname ); } - public function insert( $table, $rows, $fname = __METHOD__, $options = [] ) { - # No rows to insert, easy just return now - if ( !count( $rows ) ) { - return true; - } + /** + * @see Database::insert() + * @param string $table + * @param array $rows Non-empty list of rows + * @param string $fname + * @since 1.35 + */ + protected function doInsertNonConflicting( $table, array $rows, $fname ) { + $encTable = $this->tableName( $table ); + list( $sqlColumns, $sqlTuples ) = $this->makeInsertLists( $rows ); + list( $sqlVerb, $sqlOpts ) = $this->makeInsertNonConflictingVerbAndOptions(); - $table = $this->tableName( $table ); + $sql = rtrim( "$sqlVerb $encTable ($sqlColumns) VALUES $sqlTuples $sqlOpts" ); - if ( !is_array( $options ) ) { - $options = [ $options ]; - } + $this->query( $sql, $fname ); + } - $options = $this->makeInsertOptions( $options ); + /** + * @return string[] ("INSERT"-style SQL verb, "ON CONFLICT"-style clause or "") + * @since 1.35 + */ + protected function makeInsertNonConflictingVerbAndOptions() { + return [ 'INSERT IGNORE INTO', '' ]; + } - $multi = $this->isMultiRowArray( $rows ); - if ( $multi ) { - $keys = array_keys( $rows[0] ); - } else { - $keys = array_keys( $rows ); + /** + * Make SQL lists of columns, row tuples for INSERT/VALUES expressions + * + * The tuple column order is that of the columns of the first provided row. + * The provided rows must have exactly the same keys and ordering thereof. + * + * @param array[] $rows Non-empty list of (column => value) maps + * @return array (comma-separated columns, comma-separated tuples) + * @since 1.35 + */ + protected function makeInsertLists( array $rows ) { + $firstRow = $rows[0]; + if ( !is_array( $firstRow ) || !$firstRow ) { + throw new DBUnexpectedError( $this, 'Got an empty row list or empty row' ); } + // List of columns that define the value tuple ordering + $tupleColumns = array_keys( $firstRow ); - $sql = 'INSERT ' . $options . - " INTO $table (" . implode( ',', $keys ) . ') VALUES '; - - if ( $multi ) { - $first = true; - foreach ( $rows as $row ) { - if ( $first ) { - $first = false; - } else { - $sql .= ','; - } - $sql .= '(' . $this->makeList( $row ) . ')'; + $valueTuples = []; + foreach ( $rows as $row ) { + $rowColumns = array_keys( $row ); + // VALUES(...) requires a uniform correspondance of (column => value) + if ( $rowColumns !== $tupleColumns ) { + throw new DBUnexpectedError( + $this, + 'Got row columns (' . implode( ', ', $rowColumns ) . ') ' . + 'instead of expected (' . implode( ', ', $tupleColumns ) . ')' + ); } - } else { - $sql .= '(' . $this->makeList( $rows ) . ')'; + // Make the value tuple that defines this row + $valueTuples[] = '(' . $this->makeList( $row, self::LIST_COMMA ) . ')'; } - $this->query( $sql, $fname ); - - return true; + return [ + $this->makeList( $tupleColumns, self::LIST_NAMES ), + implode( ',', $valueTuples ) + ]; } /** @@ -2178,9 +2313,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware * @return array */ protected function makeUpdateOptionsArray( $options ) { - if ( !is_array( $options ) ) { - $options = [ $options ]; - } + $options = $this->normalizeOptions( $options ); $opts = []; @@ -2894,137 +3027,164 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware } public function replace( $table, $uniqueKeys, $rows, $fname = __METHOD__ ) { - if ( count( $rows ) == 0 ) { + $rows = $this->normalizeRowArray( $rows ); + if ( !$rows ) { return; } - $uniqueKeys = (array)$uniqueKeys; - // Single row case - if ( !is_array( reset( $rows ) ) ) { - $rows = [ $rows ]; + if ( $uniqueKeys ) { + $uniqueKeys = $this->normalizeUpsertKeys( $uniqueKeys ); + $this->doReplace( $table, $uniqueKeys, $rows, $fname ); + } else { + $this->queryLogger->warning( + __METHOD__ . " called with no unique keys", + [ 'exception' => new RuntimeException() ] + ); + $this->doInsert( $table, $rows, $fname ); } + } + /** + * @see Database::replace() + * @param string $table + * @param string[][] $uniqueKeys Non-empty list of unique keys + * @param array $rows Non-empty list of rows + * @param string $fname + * @since 1.35 + */ + protected function doReplace( $table, array $uniqueKeys, array $rows, $fname ) { + $affectedRowCount = 0; + $this->startAtomic( $fname, self::ATOMIC_CANCELABLE ); try { - $this->startAtomic( $fname, self::ATOMIC_CANCELABLE ); - $affectedRowCount = 0; foreach ( $rows as $row ) { - // Delete rows which collide with this one - $indexWhereClauses = []; - foreach ( $uniqueKeys as $index ) { - $indexColumns = (array)$index; - $indexRowValues = array_intersect_key( $row, array_flip( $indexColumns ) ); - if ( count( $indexRowValues ) != count( $indexColumns ) ) { - throw new DBUnexpectedError( - $this, - 'New record does not provide all values for unique key (' . - implode( ', ', $indexColumns ) . ')' - ); - } elseif ( in_array( null, $indexRowValues, true ) ) { - throw new DBUnexpectedError( - $this, - 'New record has a null value for unique key (' . - implode( ', ', $indexColumns ) . ')' - ); - } - $indexWhereClauses[] = $this->makeList( $indexRowValues, LIST_AND ); - } - - if ( $indexWhereClauses ) { - $this->delete( $table, $this->makeList( $indexWhereClauses, LIST_OR ), $fname ); - $affectedRowCount += $this->affectedRows(); - } - + // Delete any conflicting rows (including ones inserted from $rows) + $sqlCondition = $this->makeConditionCollidesUponKeys( [ $row ], $uniqueKeys ); + $this->delete( $table, [ $sqlCondition ], $fname ); + $affectedRowCount += $this->affectedRows(); // Now insert the row $this->insert( $table, $row, $fname ); $affectedRowCount += $this->affectedRows(); } $this->endAtomic( $fname ); - $this->affectedRowCount = $affectedRowCount; } catch ( Exception $e ) { $this->cancelAtomic( $fname ); throw $e; } + $this->affectedRowCount = $affectedRowCount; } /** - * REPLACE query wrapper for MySQL and SQLite, which have a native REPLACE - * statement. - * - * @param string $table Table name - * @param array|string $rows Row(s) to insert - * @param string $fname Caller function name + * @param array[] $rows Non-empty list of rows + * @param string[] $uniqueKey List of columns that define a single unique index + * @return string SQL conditions to filter existing rows to those with counterparts in $rows */ - protected function nativeReplace( $table, $rows, $fname ) { - $table = $this->tableName( $table ); + private function makeConditionCollidesUponKey( array $rows, array $uniqueKey ) { + if ( !$rows ) { + throw new DBUnexpectedError( $this, "Empty row array" ); + } elseif ( !$uniqueKey ) { + throw new DBUnexpectedError( $this, "Empty unique key array" ); + } + + if ( count( $uniqueKey ) == 1 ) { + // Use a simple IN(...) clause + $column = reset( $uniqueKey ); + $values = array_column( $rows, $column ); + if ( count( $values ) !== count( $rows ) ) { + throw new DBUnexpectedError( $this, "Missing values for unique key ($column)" ); + } - # Single row case - if ( !is_array( reset( $rows ) ) ) { - $rows = [ $rows ]; + return $this->makeList( [ $column => $values ], self::LIST_AND ); } - $sql = "REPLACE INTO $table (" . implode( ',', array_keys( $rows[0] ) ) . ') VALUES '; - $first = true; - + $disjunctions = []; foreach ( $rows as $row ) { - if ( $first ) { - $first = false; - } else { - $sql .= ','; + $rowKeyMap = array_intersect_key( $row, array_flip( $uniqueKey ) ); + if ( count( $rowKeyMap ) != count( $uniqueKey ) ) { + throw new DBUnexpectedError( + $this, + "Missing values for unique key (" . implode( ',', $uniqueKey ) . ")" + ); } - - $sql .= '(' . $this->makeList( $row ) . ')'; + $disjunctions[] = $this->makeList( $rowKeyMap, self::LIST_AND ); } - $this->query( $sql, $fname ); + return count( $disjunctions ) > 1 + ? $this->makeList( $disjunctions, self::LIST_OR ) + : $disjunctions[0]; } - public function upsert( $table, array $rows, $uniqueKeys, array $set, $fname = __METHOD__ ) { - if ( $rows === [] ) { - return true; // nothing to do + /** + * @param array[] $rows Non-empty list of rows + * @param string[][] $uniqueKeys List of column lists that each define a unique index + * @return string SQL conditions to filter existing rows to those with counterparts in $rows + * @since 1.35 + */ + final protected function makeConditionCollidesUponKeys( array $rows, array $uniqueKeys ) { + if ( !$uniqueKeys ) { + throw new DBUnexpectedError( $this, "Empty unique key array" ); } - $uniqueKeys = (array)$uniqueKeys; - if ( !is_array( reset( $rows ) ) ) { - $rows = [ $rows ]; + $disjunctions = []; + foreach ( $uniqueKeys as $uniqueKey ) { + $disjunctions[] = $this->makeConditionCollidesUponKey( $rows, $uniqueKey ); } - '@phan-var array[] $rows'; - if ( count( $uniqueKeys ) ) { - $clauses = []; // list WHERE clauses that each identify a single row - foreach ( $rows as $row ) { - foreach ( $uniqueKeys as $index ) { - $index = is_array( $index ) ? $index : [ $index ]; // columns - $rowKey = []; // unique key to this row - foreach ( $index as $column ) { - $rowKey[$column] = $row[$column]; - } - $clauses[] = $this->makeList( $rowKey, self::LIST_AND ); - } - } - $where = [ $this->makeList( $clauses, self::LIST_OR ) ]; + return count( $disjunctions ) > 1 + ? $this->makeList( $disjunctions, self::LIST_OR ) + : $disjunctions[0]; + } + + public function upsert( $table, array $rows, $uniqueKeys, array $set, $fname = __METHOD__ ) { + $rows = $this->normalizeRowArray( $rows ); + if ( !$rows ) { + return true; + } + + if ( $uniqueKeys ) { + $uniqueKeys = $this->normalizeUpsertKeys( $uniqueKeys ); + $this->doUpsert( $table, $rows, $uniqueKeys, $set, $fname ); } else { - $where = false; + $this->queryLogger->warning( + __METHOD__ . " called with no unique keys", + [ 'exception' => new RuntimeException() ] + ); + $this->doInsert( $table, $rows, $fname ); } + return true; + } + + /** + * @see Database::upsert() + * @param string $table + * @param array[] $rows Non-empty list of rows + * @param string[][] $uniqueKeys Non-empty list of unique keys + * @param array $set + * @param string $fname + * @since 1.35 + */ + protected function doUpsert( $table, array $rows, array $uniqueKeys, array $set, $fname ) { $affectedRowCount = 0; + $this->startAtomic( $fname, self::ATOMIC_CANCELABLE ); try { - $this->startAtomic( $fname, self::ATOMIC_CANCELABLE ); - # Update any existing conflicting row(s) - if ( $where !== false ) { - $this->update( $table, $set, $where, $fname ); - $affectedRowCount += $this->affectedRows(); + foreach ( $rows as $row ) { + // Update any existing conflicting rows (including ones inserted from $rows) + $sqlConditions = $this->makeConditionCollidesUponKeys( [ $row ], $uniqueKeys ); + $this->update( $table, $set, [ $sqlConditions ], $fname ); + $rowsUpdated = $this->affectedRows(); + $affectedRowCount += $rowsUpdated; + if ( $rowsUpdated <= 0 ) { + // Now insert the row if there are no conflicts + $this->insert( $table, $row, $fname ); + $affectedRowCount += $this->affectedRows(); + } } - # Now insert any non-conflicting row(s) - $this->insert( $table, $rows, $fname, [ 'IGNORE' ] ); - $affectedRowCount += $this->affectedRows(); $this->endAtomic( $fname ); - $this->affectedRowCount = $affectedRowCount; } catch ( Exception $e ) { $this->cancelAtomic( $fname ); throw $e; } - - return true; + $this->affectedRowCount = $affectedRowCount; } public function deleteJoin( $delTable, $joinTable, $delVar, $joinVar, $conds, @@ -3083,18 +3243,24 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware } final public function insertSelect( - $destTable, $srcTable, $varMap, $conds, - $fname = __METHOD__, $insertOptions = [], $selectOptions = [], $selectJoinConds = [] + $destTable, + $srcTable, + $varMap, + $conds, + $fname = __METHOD__, + $insertOptions = [], + $selectOptions = [], + $selectJoinConds = [] ) { static $hints = [ 'NO_AUTO_COLUMNS' ]; - $insertOptions = (array)$insertOptions; - $selectOptions = (array)$selectOptions; + $insertOptions = $this->normalizeOptions( $insertOptions ); + $selectOptions = $this->normalizeOptions( $selectOptions ); if ( $this->cliMode && $this->isInsertSelectSafe( $insertOptions, $selectOptions ) ) { // For massive migrations with downtime, we don't want to select everything // into memory and OOM, so do all this native on the server side if possible. - $this->nativeInsertSelect( + $this->doInsertSelectNative( $destTable, $srcTable, $varMap, @@ -3105,7 +3271,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware $selectJoinConds ); } else { - $this->nonNativeInsertSelect( + $this->doInsertSelectGeneric( $destTable, $srcTable, $varMap, @@ -3134,7 +3300,6 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware * Implementation of insertSelect() based on select() and insert() * * @see IDatabase::insertSelect() - * @since 1.30 * @param string $destTable * @param string|array $srcTable * @param array $varMap @@ -3143,10 +3308,17 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware * @param array $insertOptions * @param array $selectOptions * @param array $selectJoinConds + * @since 1.35 */ - protected function nonNativeInsertSelect( $destTable, $srcTable, $varMap, $conds, - $fname = __METHOD__, - $insertOptions = [], $selectOptions = [], $selectJoinConds = [] + protected function doInsertSelectGeneric( + $destTable, + $srcTable, + array $varMap, + $conds, + $fname, + array $insertOptions, + array $selectOptions, + $selectJoinConds ) { // For web requests, do a locking SELECT and then INSERT. This puts the SELECT burden // on only the master (without needing row-based-replication). It also makes it easy to @@ -3155,48 +3327,37 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware foreach ( $varMap as $dstColumn => $sourceColumnOrSql ) { $fields[] = $this->fieldNameWithAlias( $sourceColumnOrSql, $dstColumn ); } - $selectOptions[] = 'FOR UPDATE'; $res = $this->select( - $srcTable, implode( ',', $fields ), $conds, $fname, $selectOptions, $selectJoinConds + $srcTable, + implode( ',', $fields ), + $conds, + $fname, + array_merge( $selectOptions, [ 'FOR UPDATE' ] ), + $selectJoinConds ); if ( !$res ) { return; } + $affectedRowCount = 0; + $this->startAtomic( $fname, self::ATOMIC_CANCELABLE ); try { - $affectedRowCount = 0; - $this->startAtomic( $fname, self::ATOMIC_CANCELABLE ); $rows = []; - $ok = true; foreach ( $res as $row ) { $rows[] = (array)$row; - - // Avoid inserts that are too huge - if ( count( $rows ) >= $this->nonNativeInsertSelectBatchSize ) { - $ok = $this->insert( $destTable, $rows, $fname, $insertOptions ); - if ( !$ok ) { - break; - } - $affectedRowCount += $this->affectedRows(); - $rows = []; - } } - if ( $rows && $ok ) { - $ok = $this->insert( $destTable, $rows, $fname, $insertOptions ); - if ( $ok ) { - $affectedRowCount += $this->affectedRows(); - } - } - if ( $ok ) { - $this->endAtomic( $fname ); - $this->affectedRowCount = $affectedRowCount; - } else { - $this->cancelAtomic( $fname ); + // Avoid inserts that are too huge + $rowBatches = array_chunk( $rows, $this->nonNativeInsertSelectBatchSize ); + foreach ( $rowBatches as $rows ) { + $this->insert( $destTable, $rows, $fname, $insertOptions ); + $affectedRowCount += $this->affectedRows(); } } catch ( Exception $e ) { $this->cancelAtomic( $fname ); throw $e; } + $this->endAtomic( $fname ); + $this->affectedRowCount = $affectedRowCount; } /** @@ -3212,19 +3373,23 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware * @param array $insertOptions * @param array $selectOptions * @param array $selectJoinConds + * @since 1.35 */ - protected function nativeInsertSelect( $destTable, $srcTable, $varMap, $conds, - $fname = __METHOD__, - $insertOptions = [], $selectOptions = [], $selectJoinConds = [] + protected function doInsertSelectNative( + $destTable, + $srcTable, + array $varMap, + $conds, + $fname, + array $insertOptions, + array $selectOptions, + $selectJoinConds ) { - $destTable = $this->tableName( $destTable ); - - if ( !is_array( $insertOptions ) ) { - $insertOptions = [ $insertOptions ]; - } - - $insertOptions = $this->makeInsertOptions( $insertOptions ); - + list( $sqlVerb, $sqlOpts ) = $this->isFlagInOptions( 'IGNORE', $insertOptions ) + ? $this->makeInsertNonConflictingVerbAndOptions() + : [ 'INSERT INTO', '' ]; + $encDstTable = $this->tableName( $destTable ); + $sqlDstColumns = implode( ',', array_keys( $varMap ) ); $selectSql = $this->selectSQLText( $srcTable, array_values( $varMap ), @@ -3234,9 +3399,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware $selectJoinConds ); - $sql = "INSERT $insertOptions" . - " INTO $destTable (" . implode( ',', array_keys( $varMap ) ) . ') ' . - $selectSql; + $sql = rtrim( "$sqlVerb $encDstTable ($sqlDstColumns) $selectSql $sqlOpts" ); $this->query( $sql, $fname ); } diff --git a/includes/libs/rdbms/database/DatabaseMysqlBase.php b/includes/libs/rdbms/database/DatabaseMysqlBase.php index 353d1a09bf3a..614a756ab4c8 100644 --- a/includes/libs/rdbms/database/DatabaseMysqlBase.php +++ b/includes/libs/rdbms/database/DatabaseMysqlBase.php @@ -448,10 +448,6 @@ abstract class DatabaseMysqlBase extends Database { return in_array( $errno, [ 2062, 3024 ] ); } - public function replace( $table, $uniqueKeys, $rows, $fname = __METHOD__ ) { - $this->nativeReplace( $table, $rows, $fname ); - } - protected function isInsertSelectSafe( array $insertOptions, array $selectOptions ) { $row = $this->getReplicationSafetyInfo(); // For row-based-replication, the resulting changes will be relayed, not the query @@ -1324,31 +1320,25 @@ abstract class DatabaseMysqlBase extends Database { $this->query( $sql, $fname ); } - public function upsert( - $table, array $rows, $uniqueKeys, array $set, $fname = __METHOD__ - ) { - if ( $rows === [] ) { - return true; // nothing to do - } + protected function doUpsert( $table, array $rows, array $uniqueKeys, array $set, $fname ) { + $encTable = $this->tableName( $table ); + list( $sqlColumns, $sqlTuples ) = $this->makeInsertLists( $rows ); + $sqlColumnAssignments = $this->makeList( $set, self::LIST_SET ); - if ( !is_array( reset( $rows ) ) ) { - $rows = [ $rows ]; - } + $sql = + "INSERT INTO $encTable ($sqlColumns) VALUES $sqlTuples " . + "ON DUPLICATE KEY UPDATE $sqlColumnAssignments"; - $table = $this->tableName( $table ); - $columns = array_keys( $rows[0] ); + $this->query( $sql, $fname ); + } - $sql = "INSERT INTO $table (" . implode( ',', $columns ) . ') VALUES '; - $rowTuples = []; - foreach ( $rows as $row ) { - $rowTuples[] = '(' . $this->makeList( $row ) . ')'; - } - $sql .= implode( ',', $rowTuples ); - $sql .= " ON DUPLICATE KEY UPDATE " . $this->makeList( $set, self::LIST_SET ); + protected function doReplace( $table, array $uniqueKeys, array $rows, $fname ) { + $encTable = $this->tableName( $table ); + list( $sqlColumns, $sqlTuples ) = $this->makeInsertLists( $rows ); - $this->query( $sql, $fname ); + $sql = "REPLACE INTO $encTable ($sqlColumns) VALUES $sqlTuples"; - return true; + $this->query( $sql, $fname ); } /** diff --git a/includes/libs/rdbms/database/DatabasePostgres.php b/includes/libs/rdbms/database/DatabasePostgres.php index c16ab403e02f..66d71eabeb7d 100644 --- a/includes/libs/rdbms/database/DatabasePostgres.php +++ b/includes/libs/rdbms/database/DatabasePostgres.php @@ -542,93 +542,53 @@ __INDEXATTR__; return parent::selectSQLText( $table, $vars, $conds, $fname, $options, $join_conds ); } - /** @inheritDoc */ - public function insert( $table, $args, $fname = __METHOD__, $options = [] ) { - if ( !count( $args ) ) { - return true; - } + protected function makeInsertNonConflictingVerbAndOptions() { + return [ 'INSERT INTO', 'ON CONFLICT DO NOTHING' ]; + } - $table = $this->tableName( $table ); - if ( !isset( $this->numericVersion ) ) { - $this->getServerVersion(); - } + public function doInsertNonConflicting( $table, array $rows, $fname ) { + // Postgres 9.5 supports "ON CONFLICT" + if ( $this->getServerVersion() >= 9.5 ) { + parent::doInsertNonConflicting( $table, $rows, $fname ); - if ( !is_array( $options ) ) { - $options = [ $options ]; + return; } - if ( isset( $args[0] ) && is_array( $args[0] ) ) { - $rows = $args; - $keys = array_keys( $args[0] ); - } else { - $rows = [ $args ]; - $keys = array_keys( $args ); - } - - $ignore = in_array( 'IGNORE', $options ); - - $sql = "INSERT INTO $table (" . implode( ',', $keys ) . ') VALUES '; - - if ( $this->numericVersion >= 9.5 || !$ignore ) { - // No IGNORE or our PG has "ON CONFLICT DO NOTHING" - $first = true; + $affectedRowCount = 0; + // Emulate INSERT IGNORE via savepoints/rollback + $tok = $this->startAtomic( "$fname (outer)", self::ATOMIC_CANCELABLE ); + try { + $encTable = $this->tableName( $table ); foreach ( $rows as $row ) { - if ( $first ) { - $first = false; - } else { - $sql .= ','; - } - $sql .= '(' . $this->makeList( $row ) . ')'; - } - if ( $ignore ) { - $sql .= ' ON CONFLICT DO NOTHING'; - } - $this->query( $sql, $fname ); - } else { - // Emulate IGNORE by doing each row individually, with savepoints - // to roll back as necessary. - $numrowsinserted = 0; - - $tok = $this->startAtomic( "$fname (outer)", self::ATOMIC_CANCELABLE ); - try { - foreach ( $rows as $row ) { - $tempsql = $sql; - $tempsql .= '(' . $this->makeList( $row ) . ')'; - - $this->startAtomic( "$fname (inner)", self::ATOMIC_CANCELABLE ); - try { - $this->query( $tempsql, $fname ); - $this->endAtomic( "$fname (inner)" ); - $numrowsinserted++; - } catch ( DBQueryError $e ) { - $this->cancelAtomic( "$fname (inner)" ); - // Our IGNORE is supposed to ignore duplicate key errors, but not others. - // (even though MySQL's version apparently ignores all errors) - if ( $e->errno !== '23505' ) { - throw $e; - } + list( $sqlColumns, $sqlTuples ) = $this->makeInsertLists( [ $row ] ); + $tempsql = "INSERT INTO $encTable ($sqlColumns) VALUES ($sqlTuples)"; + + $this->startAtomic( "$fname (inner)", self::ATOMIC_CANCELABLE ); + try { + $this->query( $tempsql, $fname ); + $this->endAtomic( "$fname (inner)" ); + $affectedRowCount++; + } catch ( DBQueryError $e ) { + $this->cancelAtomic( "$fname (inner)" ); + // Our IGNORE is supposed to ignore duplicate key errors, but not others. + // (even though MySQL's version apparently ignores all errors) + if ( $e->errno !== '23505' ) { + throw $e; } } - } catch ( Exception $e ) { - $this->cancelAtomic( "$fname (outer)", $tok ); - throw $e; } - $this->endAtomic( "$fname (outer)" ); - - // Set the affected row count for the whole operation - $this->affectedRowCount = $numrowsinserted; + } catch ( Exception $e ) { + $this->cancelAtomic( "$fname (outer)", $tok ); + throw $e; } - - return true; + $this->endAtomic( "$fname (outer)" ); + // Set the affected row count for the whole operation + $this->affectedRowCount = $affectedRowCount; } protected function makeUpdateOptionsArray( $options ) { - if ( !is_array( $options ) ) { - $options = [ $options ]; - } - - // PostgreSQL doesn't support anything like "ignore" for - // UPDATE. + $options = $this->normalizeOptions( $options ); + // PostgreSQL doesn't support anything like "ignore" for UPDATE. $options = array_diff( $options, [ 'IGNORE' ] ); return parent::makeUpdateOptionsArray( $options ); @@ -652,9 +612,15 @@ __INDEXATTR__; * @param array $selectOptions * @param array $selectJoinConds */ - protected function nativeInsertSelect( - $destTable, $srcTable, $varMap, $conds, $fname = __METHOD__, - $insertOptions = [], $selectOptions = [], $selectJoinConds = [] + protected function doInsertSelectNative( + $destTable, + $srcTable, + array $varMap, + $conds, + $fname, + array $insertOptions, + array $selectOptions, + $selectJoinConds ) { if ( !is_array( $insertOptions ) ) { $insertOptions = [ $insertOptions ]; @@ -662,7 +628,7 @@ __INDEXATTR__; if ( in_array( 'IGNORE', $insertOptions ) ) { if ( $this->getServerVersion() >= 9.5 ) { - // Use ON CONFLICT DO NOTHING if we have it for IGNORE + // Use "ON CONFLICT DO" if we have it for IGNORE $destTable = $this->tableName( $destTable ); $selectSql = $this->selectSQLText( @@ -680,13 +646,13 @@ __INDEXATTR__; $this->query( $sql, $fname ); } else { // IGNORE and we don't have ON CONFLICT DO NOTHING, so just use the non-native version - $this->nonNativeInsertSelect( + $this->doInsertSelectGeneric( $destTable, $srcTable, $varMap, $conds, $fname, $insertOptions, $selectOptions, $selectJoinConds ); } } else { - parent::nativeInsertSelect( $destTable, $srcTable, $varMap, $conds, $fname, + parent::doInsertSelectNative( $destTable, $srcTable, $varMap, $conds, $fname, $insertOptions, $selectOptions, $selectJoinConds ); } } diff --git a/includes/libs/rdbms/database/DatabaseSqlite.php b/includes/libs/rdbms/database/DatabaseSqlite.php index bbf58570ec09..0671eb9af673 100644 --- a/includes/libs/rdbms/database/DatabaseSqlite.php +++ b/includes/libs/rdbms/database/DatabaseSqlite.php @@ -661,18 +661,15 @@ class DatabaseSqlite extends Database { return $options; } - /** - * @param array $options - * @return string - */ - protected function makeInsertOptions( $options ) { - $options = self::rewriteIgnoreKeyword( $options ); - - return parent::makeInsertOptions( $options ); + protected function makeInsertNonConflictingVerbAndOptions() { + return [ 'INSERT OR IGNORE INTO', '' ]; } - public function replace( $table, $uniqueKeys, $rows, $fname = __METHOD__ ) { - $this->nativeReplace( $table, $rows, $fname ); + protected function doReplace( $table, array $uniqueKeys, array $rows, $fname ) { + $encTable = $this->tableName( $table ); + list( $sqlColumns, $sqlTuples ) = $this->makeInsertLists( $rows ); + // https://sqlite.org/lang_insert.html + $this->query( "REPLACE INTO $encTable ($sqlColumns) VALUES $sqlTuples", $fname ); } /** diff --git a/tests/phpunit/includes/db/DatabaseTestHelper.php b/tests/phpunit/includes/db/DatabaseTestHelper.php index 08d3d9749a00..5952ef7f4608 100644 --- a/tests/phpunit/includes/db/DatabaseTestHelper.php +++ b/tests/phpunit/includes/db/DatabaseTestHelper.php @@ -43,6 +43,9 @@ class DatabaseTestHelper extends Database { */ protected $unionSupportsOrderAndLimit = true; + /** @var int[] */ + protected $forcedAffectedCountQueue = []; + public function __construct( $testName, array $opts = [] ) { parent::__construct( $opts + [ 'host' => null, @@ -166,11 +169,6 @@ class DatabaseTestHelper extends Database { return in_array( $table, (array)$this->tablesExists ); } - // Redeclare parent method to make it public - public function nativeReplace( $table, $rows, $fname ) { - parent::nativeReplace( $table, $rows, $fname ); - } - public function getType() { return 'test'; } @@ -254,6 +252,10 @@ class DatabaseTestHelper extends Database { return true; } + public function setNextQueryAffectedRowCounts( array $counts ) { + $this->forcedAffectedCountQueue = $counts; + } + protected function doQuery( $sql ) { $sql = preg_replace( '< /\* .+? \*/>', '', $sql ); $this->addSql( $sql ); @@ -268,6 +270,10 @@ class DatabaseTestHelper extends Database { $this->nextResult = []; $this->lastError = null; + if ( $this->forcedAffectedCountQueue ) { + $this->affectedRowCount = array_shift( $this->forcedAffectedCountQueue ); + } + return new FakeResultWrapper( $res ); } diff --git a/tests/phpunit/includes/libs/rdbms/database/DatabaseSQLTest.php b/tests/phpunit/includes/libs/rdbms/database/DatabaseSQLTest.php index 3e36676db658..076277c57a77 100644 --- a/tests/phpunit/includes/libs/rdbms/database/DatabaseSQLTest.php +++ b/tests/phpunit/includes/libs/rdbms/database/DatabaseSQLTest.php @@ -499,6 +499,8 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase { * @covers Wikimedia\Rdbms\Database::upsert */ public function testUpsert( $sql, $sqlText ) { + $this->database->setNextQueryAffectedRowCounts( [ 0 ] ); + $this->database->upsert( $sql['table'], $sql['rows'], @@ -515,18 +517,50 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase { [ 'table' => 'upsert_table', 'rows' => [ 'field' => 'text', 'field2' => 'text2' ], - 'uniqueIndexes' => [ 'field' ], + 'uniqueIndexes' => 'field', 'set' => [ 'field' => 'set' ], ], "BEGIN; " . "UPDATE upsert_table " . "SET field = 'set' " . - "WHERE ((field = 'text')); " . - "INSERT IGNORE INTO upsert_table " . + "WHERE (field = 'text'); " . + "INSERT INTO upsert_table " . "(field,field2) " . "VALUES ('text','text2'); " . "COMMIT" ], + [ + [ + 'table' => 'upsert_table', + 'rows' => [ 'field' => 'text', 'field2' => 'text2' ], + 'uniqueIndexes' => [ [ 'field' ] ], + 'set' => [ 'field' => 'set' ], + ], + "BEGIN; " . + "UPDATE upsert_table " . + "SET field = 'set' " . + "WHERE (field = 'text'); " . + "INSERT INTO upsert_table " . + "(field,field2) " . + "VALUES ('text','text2'); " . + "COMMIT" + ], + [ + [ + 'table' => 'upsert_table', + 'rows' => [ 'fieldA' => 'text', 'fieldB' => 'more', 'field2' => 'text2' ], + 'uniqueIndexes' => [ [ 'fieldA', 'fieldB' ] ], + 'set' => [ 'field2' => 'set' ], + ], + "BEGIN; " . + "UPDATE upsert_table " . + "SET field2 = 'set' " . + "WHERE (fieldA = 'text' AND fieldB = 'more'); " . + "INSERT INTO upsert_table " . + "(fieldA,fieldB,field2) " . + "VALUES ('text','more','text2'); " . + "COMMIT" + ], ]; } @@ -580,7 +614,6 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase { /** * @dataProvider provideInsert * @covers Wikimedia\Rdbms\Database::insert - * @covers Wikimedia\Rdbms\Database::makeInsertOptions */ public function testInsert( $sql, $sqlText ) { $this->database->insert( @@ -634,7 +667,7 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase { /** * @dataProvider provideInsertSelect * @covers Wikimedia\Rdbms\Database::insertSelect - * @covers Wikimedia\Rdbms\Database::nativeInsertSelect + * @covers Wikimedia\Rdbms\Database::doInsertSelectNative */ public function testInsertSelect( $sql, $sqlTextNative, $sqlSelect, $sqlInsert ) { $this->database->insertSelect( @@ -746,7 +779,7 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase { /** * @covers Wikimedia\Rdbms\Database::insertSelect - * @covers Wikimedia\Rdbms\Database::nativeInsertSelect + * @covers Wikimedia\Rdbms\Database::doInsertSelectNative */ public function testInsertSelectBatching() { $dbWeb = new DatabaseTestHelper( __CLASS__, [ 'cliMode' => false ] ); @@ -860,12 +893,12 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase { ], ], "BEGIN; DELETE FROM module_deps " . - "WHERE (md_module = 'module') OR (md_skin = 'skin'); " . + "WHERE ((md_module = 'module') OR (md_skin = 'skin')); " . "INSERT INTO module_deps " . "(md_module,md_skin,md_deps) " . "VALUES ('module','skin','deps'); " . "DELETE FROM module_deps " . - "WHERE (md_module = 'module2') OR (md_skin = 'skin2'); " . + "WHERE ((md_module = 'module2') OR (md_skin = 'skin2')); " . "INSERT INTO module_deps " . "(md_module,md_skin,md_deps) " . "VALUES ('module2','skin2','deps2'); COMMIT" @@ -880,36 +913,9 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase { 'md_deps' => 'deps', ], ], - "BEGIN; INSERT INTO module_deps " . + "INSERT INTO module_deps " . "(md_module,md_skin,md_deps) " . - "VALUES ('module','skin','deps'); COMMIT" - ], - ]; - } - - /** - * @dataProvider provideNativeReplace - * @covers Wikimedia\Rdbms\Database::nativeReplace - */ - public function testNativeReplace( $sql, $sqlText ) { - $this->database->nativeReplace( - $sql['table'], - $sql['rows'], - __METHOD__ - ); - $this->assertLastSql( $sqlText ); - } - - public static function provideNativeReplace() { - return [ - [ - [ - 'table' => 'replace_table', - 'rows' => [ 'field' => 'text', 'field2' => 'text2' ], - ], - "REPLACE INTO replace_table " . - "(field,field2) " . - "VALUES ('text','text2')" + "VALUES ('module','skin','deps')" ], ]; } diff --git a/tests/phpunit/includes/db/DatabasePostgresTest.php b/tests/phpunit/integration/includes/db/DatabasePostgresTest.php index 4a9be12b9bed..481118db065b 100644 --- a/tests/phpunit/includes/db/DatabasePostgresTest.php +++ b/tests/phpunit/integration/includes/db/DatabasePostgresTest.php @@ -145,7 +145,7 @@ class DatabasePostgresTest extends MediaWikiTestCase { } /** - * @covers Wikimedia\Rdbms\DatabasePostgres::nativeInsertSelect + * @covers Wikimedia\Rdbms\DatabasePostgres::doInsertSelectNative */ public function testInsertSelectIgnoreOld() { if ( !$this->db instanceof DatabasePostgres ) { @@ -167,7 +167,7 @@ class DatabasePostgresTest extends MediaWikiTestCase { } /** - * @covers Wikimedia\Rdbms\DatabasePostgres::nativeInsertSelect + * @covers Wikimedia\Rdbms\DatabasePostgres::doInsertSelectNative */ public function testInsertSelectIgnoreNew() { if ( !$this->db instanceof DatabasePostgres ) { diff --git a/tests/phpunit/integration/includes/db/DatabaseSqliteTest.php b/tests/phpunit/integration/includes/db/DatabaseSqliteTest.php index b7f2c11a93b6..d23b121abaa7 100644 --- a/tests/phpunit/integration/includes/db/DatabaseSqliteTest.php +++ b/tests/phpunit/integration/includes/db/DatabaseSqliteTest.php @@ -591,13 +591,13 @@ class DatabaseSqliteTest extends \MediaWikiIntegrationTestCase { '3.7.11', 'a', [ 'a_1' => 1 ], - 'INSERT INTO a (a_1) VALUES (1);' + 'INSERT INTO a (a_1) VALUES (1);' ], [ '3.7.10', 'a', [ 'a_1' => 1 ], - 'INSERT INTO a (a_1) VALUES (1);' + 'INSERT INTO a (a_1) VALUES (1);' ], [ '3.7.11', @@ -606,7 +606,7 @@ class DatabaseSqliteTest extends \MediaWikiIntegrationTestCase { [ 'a_1' => 2 ], [ 'a_1' => 3 ] ], - 'INSERT INTO a (a_1) VALUES (2),(3);' + 'INSERT INTO a (a_1) VALUES (2),(3);' ], [ '3.7.10', @@ -616,8 +616,8 @@ class DatabaseSqliteTest extends \MediaWikiIntegrationTestCase { [ 'a_1' => 3 ] ], 'BEGIN;' . - 'INSERT INTO a (a_1) VALUES (2);' . - 'INSERT INTO a (a_1) VALUES (3);' . + 'INSERT INTO a (a_1) VALUES (2);' . + 'INSERT INTO a (a_1) VALUES (3);' . 'COMMIT;' ] ]; |