diff options
author | Timo Tijhof <krinkle@fastmail.com> | 2022-03-25 00:06:08 +0000 |
---|---|---|
committer | Timo Tijhof <krinkle@fastmail.com> | 2022-03-25 00:07:22 +0000 |
commit | 2de79774e1eb4578502a496020079eb3b3a20fe6 (patch) | |
tree | 74bc81f88bcac0e82e406531d1a289d77c7ded1d | |
parent | 1fbbcb6371517cac6d8b8c918be1a94b75f62594 (diff) | |
download | mediawikicore-2de79774e1eb4578502a496020079eb3b3a20fe6.tar.gz mediawikicore-2de79774e1eb4578502a496020079eb3b3a20fe6.zip |
Remove deprecated EventRelayerKafka and KafkaHandler
Also remove the unmaintained kafka-php package from the from "suggested"
and "dev" composer dependencies, as this is now no longer used.
Change-Id: If5668974f417b627df95bce47db18d46fa03327c
-rw-r--r-- | RELEASE-NOTES-1.39 | 2 | ||||
-rw-r--r-- | autoload.php | 1 | ||||
-rw-r--r-- | composer.json | 4 | ||||
-rw-r--r-- | includes/debug/logger/monolog/KafkaHandler.php | 278 | ||||
-rw-r--r-- | includes/libs/eventrelayer/EventRelayerKafka.php | 69 | ||||
-rw-r--r-- | tests/phpunit/unit/includes/debug/logger/monolog/KafkaHandlerTest.php | 211 |
6 files changed, 3 insertions, 562 deletions
diff --git a/RELEASE-NOTES-1.39 b/RELEASE-NOTES-1.39 index 8c2de7d4f744..9416c7714ba1 100644 --- a/RELEASE-NOTES-1.39 +++ b/RELEASE-NOTES-1.39 @@ -80,6 +80,8 @@ because of Phabricator reports. - ::fetchRow() - ::numRows() - ::freeResult() +* EventRelayerKafka, deprecated in 1.38, was removed. +* MediaWiki\Logger\Monolog\KafkaHandler, deprecated in 1.38, was removed. * Database::wasKnownStatementRollbackError() was removed. Subclasses should override isKnownStatementRollbackError() instead. * Changes to skins: diff --git a/autoload.php b/autoload.php index de08fff6b8ca..aa1a2706897a 100644 --- a/autoload.php +++ b/autoload.php @@ -442,7 +442,6 @@ $wgAutoloadLocalClasses = [ 'EtcdConfigParseError' => __DIR__ . '/includes/config/EtcdConfigParseError.php', 'EventRelayer' => __DIR__ . '/includes/libs/eventrelayer/EventRelayer.php', 'EventRelayerGroup' => __DIR__ . '/includes/EventRelayerGroup.php', - 'EventRelayerKafka' => __DIR__ . '/includes/libs/eventrelayer/EventRelayerKafka.php', 'EventRelayerNull' => __DIR__ . '/includes/libs/eventrelayer/EventRelayerNull.php', 'ExecutableFinder' => __DIR__ . '/includes/utils/ExecutableFinder.php', 'Exif' => __DIR__ . '/includes/media/Exif.php', diff --git a/composer.json b/composer.json index f62b0f521c71..978930e365a1 100644 --- a/composer.json +++ b/composer.json @@ -90,7 +90,6 @@ "mediawiki/mediawiki-codesniffer": "38.0.0", "mediawiki/mediawiki-phan-config": "0.11.0", "nikic/php-parser": "^4.10.2", - "nmred/kafka-php": "0.1.5", "php-parallel-lint/php-console-highlighter": "0.5", "php-parallel-lint/php-parallel-lint": "1.3.1", "phpunit/phpunit": "^8.5", @@ -117,8 +116,7 @@ "ext-sockets": "Enable CLI concurrent processing, e.g. for rebuildLocalisationCache.php.", "ext-wikidiff2": "Faster text difference engine.", "ext-zlib": "Enable use of GZIP compression, e.g. for SqlBagOStuff (ParserCache), $wgCompressRevisions, or $wgUseFileCache.", - "monolog/monolog": "Enable use of MonologSpi ($wgMWLoggerDefaultSpi).", - "nmred/kafka-php": "Enable use of KafkaHandler (MonologSpi), or EventRelayerKafka ($wgEventRelayerConfig)." + "monolog/monolog": "Enable use of MonologSpi ($wgMWLoggerDefaultSpi)." }, "autoload": { "psr-0": { diff --git a/includes/debug/logger/monolog/KafkaHandler.php b/includes/debug/logger/monolog/KafkaHandler.php deleted file mode 100644 index bfc0a47bc862..000000000000 --- a/includes/debug/logger/monolog/KafkaHandler.php +++ /dev/null @@ -1,278 +0,0 @@ -<?php -/** - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License along - * with this program; if not, write to the Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - * http://www.gnu.org/copyleft/gpl.html - * - * @file - */ - -namespace MediaWiki\Logger\Monolog; - -use Kafka\MetaDataFromKafka; -use Kafka\Produce; -use Kafka\Protocol\Decoder; -use MediaWiki\Logger\LoggerFactory; -use Monolog\Handler\AbstractProcessingHandler; -use Monolog\Logger; -use Psr\Log\LoggerInterface; - -/** - * Log handler sends log events to a kafka server. - * - * Constructor options array arguments: - * * alias: map from monolog channel to kafka topic name. When no - * alias exists the topic "monolog_$channel" will be used. - * * swallowExceptions: Swallow exceptions that occur while talking to - * kafka. Defaults to false. - * * logExceptions: Log exceptions talking to kafka here. Either null, - * the name of a channel to log to, or an object implementing - * FormatterInterface. Defaults to null. - * - * Requires the nmred/kafka-php library, version >= 1.3.0 - * - * @deprecated since 1.38 - * @since 1.26 - * @author Erik Bernhardson <ebernhardson@wikimedia.org> - * @copyright © 2015 Erik Bernhardson and Wikimedia Foundation. - */ -class KafkaHandler extends AbstractProcessingHandler { - /** - * @var Produce Sends requests to kafka - */ - protected $produce; - - /** - * @var array Optional handler configuration - */ - protected $options; - - /** - * @var array Map from topic name to partition this request produces to - */ - protected $partitions = []; - - /** - * @var array defaults for constructor options - */ - private const DEFAULT_OPTIONS = [ - 'alias' => [], // map from monolog channel to kafka topic - 'swallowExceptions' => false, // swallow exceptions sending records - 'logExceptions' => null, // A PSR3 logger to inform about errors - 'requireAck' => 0, - ]; - - /** - * @param Produce $produce Kafka instance to produce through - * @param array $options optional handler configuration - * @param int $level The minimum logging level at which this handler will be triggered - * @param bool $bubble Whether the messages that are handled can bubble up the stack or not - */ - public function __construct( - Produce $produce, array $options, $level = Logger::DEBUG, $bubble = true - ) { - wfDeprecated( __CLASS__, '1.38' ); - parent::__construct( $level, $bubble ); - $this->produce = $produce; - $this->options = array_merge( self::DEFAULT_OPTIONS, $options ); - } - - /** - * Constructs the necessary support objects and returns a KafkaHandler - * instance. - * - * @param string[] $kafkaServers - * @param array $options - * @param int $level The minimum logging level at which this handle will be triggered - * @param bool $bubble Whether the messages that are handled can bubble the stack or not - * @return KafkaHandler - */ - public static function factory( - $kafkaServers, array $options = [], $level = Logger::DEBUG, $bubble = true - ) { - $metadata = new MetaDataFromKafka( $kafkaServers ); - $produce = new Produce( $metadata ); - - if ( isset( $options['sendTimeout'] ) ) { - $timeOut = $options['sendTimeout']; - $produce->getClient()->setStreamOption( 'SendTimeoutSec', 0 ); - $produce->getClient()->setStreamOption( 'SendTimeoutUSec', - intval( $timeOut * 1000000 ) - ); - } - if ( isset( $options['recvTimeout'] ) ) { - $timeOut = $options['recvTimeout']; - $produce->getClient()->setStreamOption( 'RecvTimeoutSec', 0 ); - $produce->getClient()->setStreamOption( 'RecvTimeoutUSec', - intval( $timeOut * 1000000 ) - ); - } - if ( isset( $options['logExceptions'] ) && is_string( $options['logExceptions'] ) ) { - $options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] ); - } - - if ( isset( $options['requireAck'] ) ) { - $produce->setRequireAck( $options['requireAck'] ); - } - - return new self( $produce, $options, $level, $bubble ); - } - - /** - * @inheritDoc - */ - protected function write( array $record ): void { - if ( $record['formatted'] !== null ) { - $this->addMessages( $record['channel'], [ $record['formatted'] ] ); - $this->send(); - } - } - - /** - * @inheritDoc - * @phan-param array[] $batch - */ - public function handleBatch( array $batch ): void { - $channels = []; - foreach ( $batch as $record ) { - if ( $record['level'] < $this->level ) { - continue; - } - $channels[$record['channel']][] = $this->processRecord( $record ); - } - - $formatter = $this->getFormatter(); - foreach ( $channels as $channel => $records ) { - $messages = []; - foreach ( $records as $idx => $record ) { - $message = $formatter->format( $record ); - if ( $message !== null ) { - $messages[] = $message; - } - } - if ( $messages ) { - $this->addMessages( $channel, $messages ); - } - } - - $this->send(); - } - - /** - * Send any records in the kafka client internal queue. - */ - protected function send() { - try { - $response = $this->produce->send(); - } catch ( \Kafka\Exception $e ) { - $ignore = $this->warning( - 'Error sending records to kafka: {exception}', - [ 'exception' => $e ] ); - if ( !$ignore ) { - throw $e; - } else { - return; - } - } - - if ( is_bool( $response ) ) { - return; - } - - $errors = []; - foreach ( $response as $topicName => $partitionResponse ) { - foreach ( $partitionResponse as $partition => $info ) { - if ( $info['errCode'] === 0 ) { - // no error - continue; - } - $errors[] = sprintf( - 'Error producing to %s (errno %d): %s', - $topicName, - $info['errCode'], - Decoder::getError( $info['errCode'] ) - ); - } - } - - if ( $errors ) { - $error = implode( "\n", $errors ); - if ( !$this->warning( $error ) ) { - throw new \RuntimeException( $error ); - } - } - } - - /** - * @param string $topic Name of topic to get partition for - * @return int|null The random partition to produce to for this request, - * or null if a partition could not be determined. - */ - protected function getRandomPartition( $topic ) { - if ( !array_key_exists( $topic, $this->partitions ) ) { - try { - $partitions = $this->produce->getAvailablePartitions( $topic ); - } catch ( \Kafka\Exception $e ) { - $ignore = $this->warning( - 'Error getting metadata for kafka topic {topic}: {exception}', - [ 'topic' => $topic, 'exception' => $e ] ); - if ( $ignore ) { - return null; - } - throw $e; - } - if ( $partitions ) { - $key = array_rand( $partitions ); - $this->partitions[$topic] = $partitions[$key]; - } else { - $details = $this->produce->getClient()->getTopicDetail( $topic ); - $ignore = $this->warning( - 'No partitions available for kafka topic {topic}', - [ 'topic' => $topic, 'kafka' => $details ] - ); - if ( !$ignore ) { - throw new \RuntimeException( "No partitions available for kafka topic $topic" ); - } - $this->partitions[$topic] = null; - } - } - return $this->partitions[$topic]; - } - - /** - * Adds records for a channel to the Kafka client internal queue. - * - * @param string $channel Name of Monolog channel records belong to - * @param array $records List of records to append - */ - protected function addMessages( $channel, array $records ) { - $topic = $this->options['alias'][$channel] ?? "monolog_$channel"; - $partition = $this->getRandomPartition( $topic ); - if ( $partition !== null ) { - $this->produce->setMessages( $topic, $partition, $records ); - } - } - - /** - * @param string $message PSR3 compatible message string - * @param array $context PSR3 compatible log context - * @return bool true if caller should ignore warning - */ - protected function warning( $message, array $context = [] ) { - if ( $this->options['logExceptions'] instanceof LoggerInterface ) { - $this->options['logExceptions']->warning( $message, $context ); - } - return $this->options['swallowExceptions']; - } -} diff --git a/includes/libs/eventrelayer/EventRelayerKafka.php b/includes/libs/eventrelayer/EventRelayerKafka.php deleted file mode 100644 index d38f3d18f65f..000000000000 --- a/includes/libs/eventrelayer/EventRelayerKafka.php +++ /dev/null @@ -1,69 +0,0 @@ -<?php - -use Kafka\Produce; - -/** - * Event relayer for Apache Kafka. - * Configuring for WANCache: - * 'relayerConfig' => [ 'class' => 'EventRelayerKafka', 'KafkaEventHost' => 'localhost:9092' ], - * - * @see $wgEventRelayerConfig - * @since 1.27 - * @deprecated since 1.38 - */ -class EventRelayerKafka extends EventRelayer { - /** - * Configuration. - * - * @var Config - */ - protected $config; - - /** - * Kafka producer. - * - * @var Produce - */ - protected $producer; - - /** - * Create Kafka producer. - * - * @param array $params - */ - public function __construct( array $params ) { - wfDeprecated( __CLASS__, '1.38' ); - - parent::__construct( $params ); - - $this->config = new HashConfig( $params ); - if ( !$this->config->has( 'KafkaEventHost' ) ) { - throw new InvalidArgumentException( "KafkaEventHost must be configured" ); - } - } - - /** - * Get the producer object from kafka-php. - * @return Produce - */ - protected function getKafkaProducer() { - if ( !$this->producer ) { - $this->producer = Produce::getInstance( - null, null, $this->config->get( 'KafkaEventHost' ) ); - } - return $this->producer; - } - - protected function doNotify( $channel, array $events ) { - $jsonEvents = array_map( 'json_encode', $events ); - try { - $producer = $this->getKafkaProducer(); - $producer->setMessages( $channel, 0, $jsonEvents ); - $producer->send(); - } catch ( \Kafka\Exception $e ) { - $this->logger->warning( "Sending events failed: $e" ); - return false; - } - return true; - } -} diff --git a/tests/phpunit/unit/includes/debug/logger/monolog/KafkaHandlerTest.php b/tests/phpunit/unit/includes/debug/logger/monolog/KafkaHandlerTest.php deleted file mode 100644 index 6c2d5c0e9dd0..000000000000 --- a/tests/phpunit/unit/includes/debug/logger/monolog/KafkaHandlerTest.php +++ /dev/null @@ -1,211 +0,0 @@ -<?php -/** - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License along - * with this program; if not, write to the Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - * http://www.gnu.org/copyleft/gpl.html - * - * @file - */ - -namespace MediaWiki\Logger\Monolog; - -use Kafka\Exception; -use Monolog\Logger; - -/** - * @covers \MediaWiki\Logger\Monolog\KafkaHandler - */ -class KafkaHandlerTest extends \MediaWikiUnitTestCase { - - protected function setUp(): void { - parent::setUp(); - if ( !class_exists( \Monolog\Handler\AbstractProcessingHandler::class ) - || !class_exists( \Kafka\Produce::class ) - ) { - $this->markTestSkipped( 'Monolog and Kafka are required for the KafkaHandlerTest' ); - } - $this->hideDeprecated( KafkaHandler::class ); - } - - public function topicNamingProvider() { - return [ - [ [], 'monolog_foo' ], - [ [ 'alias' => [ 'foo' => 'bar' ] ], 'bar' ] - ]; - } - - /** - * @dataProvider topicNamingProvider - */ - public function testTopicNaming( $options, $expect ) { - $produce = $this->getMockBuilder( \Kafka\Produce::class ) - ->disableOriginalConstructor() - ->getMock(); - $produce->method( 'getAvailablePartitions' ) - ->willReturn( [ 'A' ] ); - $produce->expects( $this->once() ) - ->method( 'setMessages' ) - ->with( $expect, $this->anything(), $this->anything() ); - $produce->method( 'send' ) - ->willReturn( true ); - - $handler = new KafkaHandler( $produce, $options ); - $handler->handle( [ - 'channel' => 'foo', - 'level' => Logger::EMERGENCY, - 'extra' => [], - 'context' => [], - ] ); - } - - public function swallowsExceptionsWhenRequested() { - return [ - // defaults to false - [ [], true ], - // also try false explicitly - [ [ 'swallowExceptions' => false ], true ], - // turn it on - [ [ 'swallowExceptions' => true ], false ], - ]; - } - - /** - * @dataProvider swallowsExceptionsWhenRequested - */ - public function testGetAvailablePartitionsException( $options, $expectException ) { - $produce = $this->getMockBuilder( \Kafka\Produce::class ) - ->disableOriginalConstructor() - ->getMock(); - $produce->method( 'getAvailablePartitions' ) - ->will( $this->throwException( new Exception ) ); - $produce->method( 'send' ) - ->willReturn( true ); - - if ( $expectException ) { - $this->expectException( Exception::class ); - } - - $handler = new KafkaHandler( $produce, $options ); - $handler->handle( [ - 'channel' => 'foo', - 'level' => Logger::EMERGENCY, - 'extra' => [], - 'context' => [], - ] ); - - if ( !$expectException ) { - $this->assertTrue( true, 'no exception was thrown' ); - } - } - - /** - * @dataProvider swallowsExceptionsWhenRequested - */ - public function testSendException( $options, $expectException ) { - $produce = $this->getMockBuilder( \Kafka\Produce::class ) - ->disableOriginalConstructor() - ->getMock(); - $produce->method( 'getAvailablePartitions' ) - ->willReturn( [ 'A' ] ); - $produce->method( 'send' ) - ->will( $this->throwException( new Exception ) ); - - if ( $expectException ) { - $this->expectException( Exception::class ); - } - - $handler = new KafkaHandler( $produce, $options ); - $handler->handle( [ - 'channel' => 'foo', - 'level' => Logger::EMERGENCY, - 'extra' => [], - 'context' => [], - ] ); - - if ( !$expectException ) { - $this->assertTrue( true, 'no exception was thrown' ); - } - } - - public function testHandlesNullFormatterResult() { - $produce = $this->getMockBuilder( \Kafka\Produce::class ) - ->disableOriginalConstructor() - ->getMock(); - $produce->method( 'getAvailablePartitions' ) - ->willReturn( [ 'A' ] ); - $produce->expects( $this->exactly( 2 ) ) - ->method( 'setMessages' ) - ->will( $this->onConsecutiveCalls( - [ $this->anything(), $this->anything(), [ 'words' ] ], - [ $this->anything(), $this->anything(), [ 'lines' ] ] - ) ); - $produce->method( 'send' ) - ->willReturn( true ); - - $formatter = $this->createMock( \Monolog\Formatter\FormatterInterface::class ); - $formatter->method( 'format' ) - ->will( $this->onConsecutiveCalls( 'words', null, 'lines' ) ); - - $handler = new KafkaHandler( $produce, [] ); - $handler->setFormatter( $formatter ); - for ( $i = 0; $i < 3; ++$i ) { - $handler->handle( [ - 'channel' => 'foo', - 'level' => Logger::EMERGENCY, - 'extra' => [], - 'context' => [], - ] ); - } - } - - public function testBatchHandlesNullFormatterResult() { - $produce = $this->getMockBuilder( \Kafka\Produce::class ) - ->disableOriginalConstructor() - ->getMock(); - $produce->method( 'getAvailablePartitions' ) - ->willReturn( [ 'A' ] ); - $produce->expects( $this->once() ) - ->method( 'setMessages' ) - ->with( $this->anything(), $this->anything(), [ 'words', 'lines' ] ); - $produce->method( 'send' ) - ->willReturn( true ); - - $formatter = $this->createMock( \Monolog\Formatter\FormatterInterface::class ); - $formatter->method( 'format' ) - ->will( $this->onConsecutiveCalls( 'words', null, 'lines' ) ); - - $handler = new KafkaHandler( $produce, [] ); - $handler->setFormatter( $formatter ); - $handler->handleBatch( [ - [ - 'channel' => 'foo', - 'level' => Logger::EMERGENCY, - 'extra' => [], - 'context' => [], - ], - [ - 'channel' => 'foo', - 'level' => Logger::EMERGENCY, - 'extra' => [], - 'context' => [], - ], - [ - 'channel' => 'foo', - 'level' => Logger::EMERGENCY, - 'extra' => [], - 'context' => [], - ], - ] ); - } -} |