Я ищу способ протестировать приложение Kafka Streams. Чтобы я мог определять входные события, а тестовый набор показывает мне результат.Тестирование топологии Kafka Streams
Возможно ли это без реальной настройки Kafka?
Я ищу способ протестировать приложение Kafka Streams. Чтобы я мог определять входные события, а тестовый набор показывает мне результат.Тестирование топологии Kafka Streams
Возможно ли это без реальной настройки Kafka?
ProcessorTopologyTestDriver
доступен как в 0.11.0.0.Он доступен в kafka-streams
тестовом артефакта (указывается с <classifier>test</classifier>
в Maven):
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.11.0.0</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
Вам также необходимо добавить kafka-clients
тест артефакт:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
Затем вы можете использовать тестовый драйвер. Согласно Javadoc, сначала создайте ProcessorTopologyTestDriver
:
StringSerializer strSerializer = new StringSerializer();
StringDeserializer strDeserializer = new StringDeserializer();
Properties props = new Properties();
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
StreamsConfig config = new StreamsConfig(props);
TopologyBuilder builder = ...
ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, builder);
Вы можете покормить вход в топологии, как если бы вы на самом деле написано в одной из тем ввода:
driver.process("input-topic", "key1", "value1", strSerializer, strSerializer);
И читать выходные темы:
ProducerRecord<String, String> record1 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
ProducerRecord<String, String> record2 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
ProducerRecord<String, String> record3 = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);
Тогда вы можете утверждать об этих результатах.
Вы можете просто запустить одного Zookeeper и брокера локально, чтобы протестировать приложение Kafka Streams.
Просто следуйте этому быстрый направляющий старт:
Также проверьте это Кафка Streams примеры (с подробной прогулкой по инструкции в JavaDocs):
Как вы спрашиваете, если это возможно, чтобы проверить Кафка Streams приложения без реальной установки Кафки, вы можете попробовать эту библиотеку Передразнивало Streams в Scala. Mocked Streams 1.0 - это библиотека для Scala> = 2.11.8, которая позволяет вам тестировать топологию обработки приложений Kafka Streams (с Apache Kafka> = 0.10.1) без Zookeeper и Kafka Brokers. Ссылка: https://github.com/jpzk/mockedstreams
Вы также можете использовать scalatest встраиваемый-Кафка, который представляет собой библиотеку, которая обеспечивает в памяти Кафка брокера для запуска ваших ScalaTest спецификации против. Он использует Kafka 0.10.1.1 и ZooKeeper 3.4.8.
Ссылка: https://github.com/manub/scalatest-embedded-kafka#scalatest-embedded-kafka-streams
Успехов!
спасибо, я посмотрю на это – imehl
Вы должны проверить комплект Kafka here.
Ваша установка тест должен выглядеть следующим образом:
KafkaUnit kafkaUnitServer = new KafkaUnit();
kafkaUnitServer.startup();
kafkaUnitServer.createTopic(testTopic);
KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(testTopic, "key", "value");
kafkaUnitServer.sendMessages(keyedMessage);
А потом читать ваши сообщения и утверждать, что все прошло нормально, вы делаете что-то вроде этого:
List<String> messages = kafkaUnitServer.readMessages(testTopic, 1);
Это на самом деле раскручивает встроенная кафка, которая помогает вам иметь все, что вам нужно, в тесте.
Вы можете получить немного любителя и настроить ваш встроенный Кафка как setup()
метода (или setupSpec()
в Спке) и остановить вашу встроенную Кафка в teardown()
.
вы можете использовать https://github.com/jpzk/mockedstreams смотрите пример ниже ...
import com.madewithtea.mockedstreams.MockedStreams
val input = Seq(("x", "v1"), ("y", "v2"))
val exp = Seq(("x", "V1"), ("y", "V2"))
val strings = Serdes.String()
MockedStreams()
.topology { builder => builder.stream(...) [...] }
.input("topic-in", strings, strings, input)
.output("topic-out", strings, strings, exp.size) shouldEqual exp
надеюсь, что это помогает ...
Весна kafka имеет поддержку для модульных испытаний со встроенной кафкой, см. https://docs.spring.io/spring-kafka/docs/2.1.0.RELEASE/reference/html/_reference.html#__embeddedkafka_annotation.
Также команда kafka работает над выпуском тестового драйвера для потоков https://issues.apache.org/jira/browse/KAFKA-3625.
Да, это то, что я сейчас делаю – imehl
Я вижу. Вы хотите сделать модульные тесты - это было непонятно из вашего вопроса ... Некоторые люди используют внутренние тестовые классы Kafka Streams - они официально не являются частью релиза, но вы можете просто получить от https://github.com/Apache/Kafka/дерево/ствол/потоки/SRC/тест/Java/орг/Apache/Кафка. Для шаблонов использования см. Интеграционные тесты Kafka Streams. Единственным недостатком является то, что эти классы могут меняться без уведомления (потому что они являются внутренними). Таким образом, ваша тестовая установка может сломаться, если позже вы обновите эти классы из github. Но до тех пор, пока вы останетесь с одним выпуском, если нужно работать правильно. –