aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTimo Tijhof <krinkle@fastmail.com>2022-03-25 00:06:08 +0000
committerTimo Tijhof <krinkle@fastmail.com>2022-03-25 00:07:22 +0000
commit2de79774e1eb4578502a496020079eb3b3a20fe6 (patch)
tree74bc81f88bcac0e82e406531d1a289d77c7ded1d
parent1fbbcb6371517cac6d8b8c918be1a94b75f62594 (diff)
downloadmediawikicore-2de79774e1eb4578502a496020079eb3b3a20fe6.tar.gz
mediawikicore-2de79774e1eb4578502a496020079eb3b3a20fe6.zip
Remove deprecated EventRelayerKafka and KafkaHandler
Also remove the unmaintained kafka-php package from the from "suggested" and "dev" composer dependencies, as this is now no longer used. Change-Id: If5668974f417b627df95bce47db18d46fa03327c
-rw-r--r--RELEASE-NOTES-1.392
-rw-r--r--autoload.php1
-rw-r--r--composer.json4
-rw-r--r--includes/debug/logger/monolog/KafkaHandler.php278
-rw-r--r--includes/libs/eventrelayer/EventRelayerKafka.php69
-rw-r--r--tests/phpunit/unit/includes/debug/logger/monolog/KafkaHandlerTest.php211
6 files changed, 3 insertions, 562 deletions
diff --git a/RELEASE-NOTES-1.39 b/RELEASE-NOTES-1.39
index 8c2de7d4f744..9416c7714ba1 100644
--- a/RELEASE-NOTES-1.39
+++ b/RELEASE-NOTES-1.39
@@ -80,6 +80,8 @@ because of Phabricator reports.
- ::fetchRow()
- ::numRows()
- ::freeResult()
+* EventRelayerKafka, deprecated in 1.38, was removed.
+* MediaWiki\Logger\Monolog\KafkaHandler, deprecated in 1.38, was removed.
* Database::wasKnownStatementRollbackError() was removed. Subclasses should
override isKnownStatementRollbackError() instead.
* Changes to skins:
diff --git a/autoload.php b/autoload.php
index de08fff6b8ca..aa1a2706897a 100644
--- a/autoload.php
+++ b/autoload.php
@@ -442,7 +442,6 @@ $wgAutoloadLocalClasses = [
'EtcdConfigParseError' => __DIR__ . '/includes/config/EtcdConfigParseError.php',
'EventRelayer' => __DIR__ . '/includes/libs/eventrelayer/EventRelayer.php',
'EventRelayerGroup' => __DIR__ . '/includes/EventRelayerGroup.php',
- 'EventRelayerKafka' => __DIR__ . '/includes/libs/eventrelayer/EventRelayerKafka.php',
'EventRelayerNull' => __DIR__ . '/includes/libs/eventrelayer/EventRelayerNull.php',
'ExecutableFinder' => __DIR__ . '/includes/utils/ExecutableFinder.php',
'Exif' => __DIR__ . '/includes/media/Exif.php',
diff --git a/composer.json b/composer.json
index f62b0f521c71..978930e365a1 100644
--- a/composer.json
+++ b/composer.json
@@ -90,7 +90,6 @@
"mediawiki/mediawiki-codesniffer": "38.0.0",
"mediawiki/mediawiki-phan-config": "0.11.0",
"nikic/php-parser": "^4.10.2",
- "nmred/kafka-php": "0.1.5",
"php-parallel-lint/php-console-highlighter": "0.5",
"php-parallel-lint/php-parallel-lint": "1.3.1",
"phpunit/phpunit": "^8.5",
@@ -117,8 +116,7 @@
"ext-sockets": "Enable CLI concurrent processing, e.g. for rebuildLocalisationCache.php.",
"ext-wikidiff2": "Faster text difference engine.",
"ext-zlib": "Enable use of GZIP compression, e.g. for SqlBagOStuff (ParserCache), $wgCompressRevisions, or $wgUseFileCache.",
- "monolog/monolog": "Enable use of MonologSpi ($wgMWLoggerDefaultSpi).",
- "nmred/kafka-php": "Enable use of KafkaHandler (MonologSpi), or EventRelayerKafka ($wgEventRelayerConfig)."
+ "monolog/monolog": "Enable use of MonologSpi ($wgMWLoggerDefaultSpi)."
},
"autoload": {
"psr-0": {
diff --git a/includes/debug/logger/monolog/KafkaHandler.php b/includes/debug/logger/monolog/KafkaHandler.php
deleted file mode 100644
index bfc0a47bc862..000000000000
--- a/includes/debug/logger/monolog/KafkaHandler.php
+++ /dev/null
@@ -1,278 +0,0 @@
-<?php
-/**
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- * @file
- */
-
-namespace MediaWiki\Logger\Monolog;
-
-use Kafka\MetaDataFromKafka;
-use Kafka\Produce;
-use Kafka\Protocol\Decoder;
-use MediaWiki\Logger\LoggerFactory;
-use Monolog\Handler\AbstractProcessingHandler;
-use Monolog\Logger;
-use Psr\Log\LoggerInterface;
-
-/**
- * Log handler sends log events to a kafka server.
- *
- * Constructor options array arguments:
- * * alias: map from monolog channel to kafka topic name. When no
- * alias exists the topic "monolog_$channel" will be used.
- * * swallowExceptions: Swallow exceptions that occur while talking to
- * kafka. Defaults to false.
- * * logExceptions: Log exceptions talking to kafka here. Either null,
- * the name of a channel to log to, or an object implementing
- * FormatterInterface. Defaults to null.
- *
- * Requires the nmred/kafka-php library, version >= 1.3.0
- *
- * @deprecated since 1.38
- * @since 1.26
- * @author Erik Bernhardson <ebernhardson@wikimedia.org>
- * @copyright © 2015 Erik Bernhardson and Wikimedia Foundation.
- */
-class KafkaHandler extends AbstractProcessingHandler {
- /**
- * @var Produce Sends requests to kafka
- */
- protected $produce;
-
- /**
- * @var array Optional handler configuration
- */
- protected $options;
-
- /**
- * @var array Map from topic name to partition this request produces to
- */
- protected $partitions = [];
-
- /**
- * @var array defaults for constructor options
- */
- private const DEFAULT_OPTIONS = [
- 'alias' => [], // map from monolog channel to kafka topic
- 'swallowExceptions' => false, // swallow exceptions sending records
- 'logExceptions' => null, // A PSR3 logger to inform about errors
- 'requireAck' => 0,
- ];
-
- /**
- * @param Produce $produce Kafka instance to produce through
- * @param array $options optional handler configuration
- * @param int $level The minimum logging level at which this handler will be triggered
- * @param bool $bubble Whether the messages that are handled can bubble up the stack or not
- */
- public function __construct(
- Produce $produce, array $options, $level = Logger::DEBUG, $bubble = true
- ) {
- wfDeprecated( __CLASS__, '1.38' );
- parent::__construct( $level, $bubble );
- $this->produce = $produce;
- $this->options = array_merge( self::DEFAULT_OPTIONS, $options );
- }
-
- /**
- * Constructs the necessary support objects and returns a KafkaHandler
- * instance.
- *
- * @param string[] $kafkaServers
- * @param array $options
- * @param int $level The minimum logging level at which this handle will be triggered
- * @param bool $bubble Whether the messages that are handled can bubble the stack or not
- * @return KafkaHandler
- */
- public static function factory(
- $kafkaServers, array $options = [], $level = Logger::DEBUG, $bubble = true
- ) {
- $metadata = new MetaDataFromKafka( $kafkaServers );
- $produce = new Produce( $metadata );
-
- if ( isset( $options['sendTimeout'] ) ) {
- $timeOut = $options['sendTimeout'];
- $produce->getClient()->setStreamOption( 'SendTimeoutSec', 0 );
- $produce->getClient()->setStreamOption( 'SendTimeoutUSec',
- intval( $timeOut * 1000000 )
- );
- }
- if ( isset( $options['recvTimeout'] ) ) {
- $timeOut = $options['recvTimeout'];
- $produce->getClient()->setStreamOption( 'RecvTimeoutSec', 0 );
- $produce->getClient()->setStreamOption( 'RecvTimeoutUSec',
- intval( $timeOut * 1000000 )
- );
- }
- if ( isset( $options['logExceptions'] ) && is_string( $options['logExceptions'] ) ) {
- $options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] );
- }
-
- if ( isset( $options['requireAck'] ) ) {
- $produce->setRequireAck( $options['requireAck'] );
- }
-
- return new self( $produce, $options, $level, $bubble );
- }
-
- /**
- * @inheritDoc
- */
- protected function write( array $record ): void {
- if ( $record['formatted'] !== null ) {
- $this->addMessages( $record['channel'], [ $record['formatted'] ] );
- $this->send();
- }
- }
-
- /**
- * @inheritDoc
- * @phan-param array[] $batch
- */
- public function handleBatch( array $batch ): void {
- $channels = [];
- foreach ( $batch as $record ) {
- if ( $record['level'] < $this->level ) {
- continue;
- }
- $channels[$record['channel']][] = $this->processRecord( $record );
- }
-
- $formatter = $this->getFormatter();
- foreach ( $channels as $channel => $records ) {
- $messages = [];
- foreach ( $records as $idx => $record ) {
- $message = $formatter->format( $record );
- if ( $message !== null ) {
- $messages[] = $message;
- }
- }
- if ( $messages ) {
- $this->addMessages( $channel, $messages );
- }
- }
-
- $this->send();
- }
-
- /**
- * Send any records in the kafka client internal queue.
- */
- protected function send() {
- try {
- $response = $this->produce->send();
- } catch ( \Kafka\Exception $e ) {
- $ignore = $this->warning(
- 'Error sending records to kafka: {exception}',
- [ 'exception' => $e ] );
- if ( !$ignore ) {
- throw $e;
- } else {
- return;
- }
- }
-
- if ( is_bool( $response ) ) {
- return;
- }
-
- $errors = [];
- foreach ( $response as $topicName => $partitionResponse ) {
- foreach ( $partitionResponse as $partition => $info ) {
- if ( $info['errCode'] === 0 ) {
- // no error
- continue;
- }
- $errors[] = sprintf(
- 'Error producing to %s (errno %d): %s',
- $topicName,
- $info['errCode'],
- Decoder::getError( $info['errCode'] )
- );
- }
- }
-
- if ( $errors ) {
- $error = implode( "\n", $errors );
- if ( !$this->warning( $error ) ) {
- throw new \RuntimeException( $error );
- }
- }
- }
-
- /**
- * @param string $topic Name of topic to get partition for
- * @return int|null The random partition to produce to for this request,
- * or null if a partition could not be determined.
- */
- protected function getRandomPartition( $topic ) {
- if ( !array_key_exists( $topic, $this->partitions ) ) {
- try {
- $partitions = $this->produce->getAvailablePartitions( $topic );
- } catch ( \Kafka\Exception $e ) {
- $ignore = $this->warning(
- 'Error getting metadata for kafka topic {topic}: {exception}',
- [ 'topic' => $topic, 'exception' => $e ] );
- if ( $ignore ) {
- return null;
- }
- throw $e;
- }
- if ( $partitions ) {
- $key = array_rand( $partitions );
- $this->partitions[$topic] = $partitions[$key];
- } else {
- $details = $this->produce->getClient()->getTopicDetail( $topic );
- $ignore = $this->warning(
- 'No partitions available for kafka topic {topic}',
- [ 'topic' => $topic, 'kafka' => $details ]
- );
- if ( !$ignore ) {
- throw new \RuntimeException( "No partitions available for kafka topic $topic" );
- }
- $this->partitions[$topic] = null;
- }
- }
- return $this->partitions[$topic];
- }
-
- /**
- * Adds records for a channel to the Kafka client internal queue.
- *
- * @param string $channel Name of Monolog channel records belong to
- * @param array $records List of records to append
- */
- protected function addMessages( $channel, array $records ) {
- $topic = $this->options['alias'][$channel] ?? "monolog_$channel";
- $partition = $this->getRandomPartition( $topic );
- if ( $partition !== null ) {
- $this->produce->setMessages( $topic, $partition, $records );
- }
- }
-
- /**
- * @param string $message PSR3 compatible message string
- * @param array $context PSR3 compatible log context
- * @return bool true if caller should ignore warning
- */
- protected function warning( $message, array $context = [] ) {
- if ( $this->options['logExceptions'] instanceof LoggerInterface ) {
- $this->options['logExceptions']->warning( $message, $context );
- }
- return $this->options['swallowExceptions'];
- }
-}
diff --git a/includes/libs/eventrelayer/EventRelayerKafka.php b/includes/libs/eventrelayer/EventRelayerKafka.php
deleted file mode 100644
index d38f3d18f65f..000000000000
--- a/includes/libs/eventrelayer/EventRelayerKafka.php
+++ /dev/null
@@ -1,69 +0,0 @@
-<?php
-
-use Kafka\Produce;
-
-/**
- * Event relayer for Apache Kafka.
- * Configuring for WANCache:
- * 'relayerConfig' => [ 'class' => 'EventRelayerKafka', 'KafkaEventHost' => 'localhost:9092' ],
- *
- * @see $wgEventRelayerConfig
- * @since 1.27
- * @deprecated since 1.38
- */
-class EventRelayerKafka extends EventRelayer {
- /**
- * Configuration.
- *
- * @var Config
- */
- protected $config;
-
- /**
- * Kafka producer.
- *
- * @var Produce
- */
- protected $producer;
-
- /**
- * Create Kafka producer.
- *
- * @param array $params
- */
- public function __construct( array $params ) {
- wfDeprecated( __CLASS__, '1.38' );
-
- parent::__construct( $params );
-
- $this->config = new HashConfig( $params );
- if ( !$this->config->has( 'KafkaEventHost' ) ) {
- throw new InvalidArgumentException( "KafkaEventHost must be configured" );
- }
- }
-
- /**
- * Get the producer object from kafka-php.
- * @return Produce
- */
- protected function getKafkaProducer() {
- if ( !$this->producer ) {
- $this->producer = Produce::getInstance(
- null, null, $this->config->get( 'KafkaEventHost' ) );
- }
- return $this->producer;
- }
-
- protected function doNotify( $channel, array $events ) {
- $jsonEvents = array_map( 'json_encode', $events );
- try {
- $producer = $this->getKafkaProducer();
- $producer->setMessages( $channel, 0, $jsonEvents );
- $producer->send();
- } catch ( \Kafka\Exception $e ) {
- $this->logger->warning( "Sending events failed: $e" );
- return false;
- }
- return true;
- }
-}
diff --git a/tests/phpunit/unit/includes/debug/logger/monolog/KafkaHandlerTest.php b/tests/phpunit/unit/includes/debug/logger/monolog/KafkaHandlerTest.php
deleted file mode 100644
index 6c2d5c0e9dd0..000000000000
--- a/tests/phpunit/unit/includes/debug/logger/monolog/KafkaHandlerTest.php
+++ /dev/null
@@ -1,211 +0,0 @@
-<?php
-/**
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- * http://www.gnu.org/copyleft/gpl.html
- *
- * @file
- */
-
-namespace MediaWiki\Logger\Monolog;
-
-use Kafka\Exception;
-use Monolog\Logger;
-
-/**
- * @covers \MediaWiki\Logger\Monolog\KafkaHandler
- */
-class KafkaHandlerTest extends \MediaWikiUnitTestCase {
-
- protected function setUp(): void {
- parent::setUp();
- if ( !class_exists( \Monolog\Handler\AbstractProcessingHandler::class )
- || !class_exists( \Kafka\Produce::class )
- ) {
- $this->markTestSkipped( 'Monolog and Kafka are required for the KafkaHandlerTest' );
- }
- $this->hideDeprecated( KafkaHandler::class );
- }
-
- public function topicNamingProvider() {
- return [
- [ [], 'monolog_foo' ],
- [ [ 'alias' => [ 'foo' => 'bar' ] ], 'bar' ]
- ];
- }
-
- /**
- * @dataProvider topicNamingProvider
- */
- public function testTopicNaming( $options, $expect ) {
- $produce = $this->getMockBuilder( \Kafka\Produce::class )
- ->disableOriginalConstructor()
- ->getMock();
- $produce->method( 'getAvailablePartitions' )
- ->willReturn( [ 'A' ] );
- $produce->expects( $this->once() )
- ->method( 'setMessages' )
- ->with( $expect, $this->anything(), $this->anything() );
- $produce->method( 'send' )
- ->willReturn( true );
-
- $handler = new KafkaHandler( $produce, $options );
- $handler->handle( [
- 'channel' => 'foo',
- 'level' => Logger::EMERGENCY,
- 'extra' => [],
- 'context' => [],
- ] );
- }
-
- public function swallowsExceptionsWhenRequested() {
- return [
- // defaults to false
- [ [], true ],
- // also try false explicitly
- [ [ 'swallowExceptions' => false ], true ],
- // turn it on
- [ [ 'swallowExceptions' => true ], false ],
- ];
- }
-
- /**
- * @dataProvider swallowsExceptionsWhenRequested
- */
- public function testGetAvailablePartitionsException( $options, $expectException ) {
- $produce = $this->getMockBuilder( \Kafka\Produce::class )
- ->disableOriginalConstructor()
- ->getMock();
- $produce->method( 'getAvailablePartitions' )
- ->will( $this->throwException( new Exception ) );
- $produce->method( 'send' )
- ->willReturn( true );
-
- if ( $expectException ) {
- $this->expectException( Exception::class );
- }
-
- $handler = new KafkaHandler( $produce, $options );
- $handler->handle( [
- 'channel' => 'foo',
- 'level' => Logger::EMERGENCY,
- 'extra' => [],
- 'context' => [],
- ] );
-
- if ( !$expectException ) {
- $this->assertTrue( true, 'no exception was thrown' );
- }
- }
-
- /**
- * @dataProvider swallowsExceptionsWhenRequested
- */
- public function testSendException( $options, $expectException ) {
- $produce = $this->getMockBuilder( \Kafka\Produce::class )
- ->disableOriginalConstructor()
- ->getMock();
- $produce->method( 'getAvailablePartitions' )
- ->willReturn( [ 'A' ] );
- $produce->method( 'send' )
- ->will( $this->throwException( new Exception ) );
-
- if ( $expectException ) {
- $this->expectException( Exception::class );
- }
-
- $handler = new KafkaHandler( $produce, $options );
- $handler->handle( [
- 'channel' => 'foo',
- 'level' => Logger::EMERGENCY,
- 'extra' => [],
- 'context' => [],
- ] );
-
- if ( !$expectException ) {
- $this->assertTrue( true, 'no exception was thrown' );
- }
- }
-
- public function testHandlesNullFormatterResult() {
- $produce = $this->getMockBuilder( \Kafka\Produce::class )
- ->disableOriginalConstructor()
- ->getMock();
- $produce->method( 'getAvailablePartitions' )
- ->willReturn( [ 'A' ] );
- $produce->expects( $this->exactly( 2 ) )
- ->method( 'setMessages' )
- ->will( $this->onConsecutiveCalls(
- [ $this->anything(), $this->anything(), [ 'words' ] ],
- [ $this->anything(), $this->anything(), [ 'lines' ] ]
- ) );
- $produce->method( 'send' )
- ->willReturn( true );
-
- $formatter = $this->createMock( \Monolog\Formatter\FormatterInterface::class );
- $formatter->method( 'format' )
- ->will( $this->onConsecutiveCalls( 'words', null, 'lines' ) );
-
- $handler = new KafkaHandler( $produce, [] );
- $handler->setFormatter( $formatter );
- for ( $i = 0; $i < 3; ++$i ) {
- $handler->handle( [
- 'channel' => 'foo',
- 'level' => Logger::EMERGENCY,
- 'extra' => [],
- 'context' => [],
- ] );
- }
- }
-
- public function testBatchHandlesNullFormatterResult() {
- $produce = $this->getMockBuilder( \Kafka\Produce::class )
- ->disableOriginalConstructor()
- ->getMock();
- $produce->method( 'getAvailablePartitions' )
- ->willReturn( [ 'A' ] );
- $produce->expects( $this->once() )
- ->method( 'setMessages' )
- ->with( $this->anything(), $this->anything(), [ 'words', 'lines' ] );
- $produce->method( 'send' )
- ->willReturn( true );
-
- $formatter = $this->createMock( \Monolog\Formatter\FormatterInterface::class );
- $formatter->method( 'format' )
- ->will( $this->onConsecutiveCalls( 'words', null, 'lines' ) );
-
- $handler = new KafkaHandler( $produce, [] );
- $handler->setFormatter( $formatter );
- $handler->handleBatch( [
- [
- 'channel' => 'foo',
- 'level' => Logger::EMERGENCY,
- 'extra' => [],
- 'context' => [],
- ],
- [
- 'channel' => 'foo',
- 'level' => Logger::EMERGENCY,
- 'extra' => [],
- 'context' => [],
- ],
- [
- 'channel' => 'foo',
- 'level' => Logger::EMERGENCY,
- 'extra' => [],
- 'context' => [],
- ],
- ] );
- }
-}