2017-01-24 4 views
2

Я ищу способ протестировать приложение Kafka Streams. Чтобы я мог определять входные события, а тестовый набор показывает мне результат.Тестирование топологии Kafka Streams

Возможно ли это без реальной настройки Kafka?

ответ

7

ProcessorTopologyTestDriver доступен как в 0.11.0.0.Он доступен в kafka-streams тестовом артефакта (указывается с <classifier>test</classifier> в Maven):

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka-streams</artifactId> 
    <version>0.11.0.0</version> 
    <classifier>test</classifier> 
    <scope>test</scope> 
</dependency> 

Вам также необходимо добавить kafka-clients тест артефакт:

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka-clients</artifactId> 
    <version>0.11.0.0</version> 
    <classifier>test</classifier> 
    <scope>test</scope> 
</dependency> 

Затем вы можете использовать тестовый драйвер. Согласно Javadoc, сначала создайте ProcessorTopologyTestDriver:

StringSerializer strSerializer = new StringSerializer(); 
StringDeserializer strDeserializer = new StringDeserializer(); 
Properties props = new Properties(); 
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091"); 
props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName()); 
props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName()); 
props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName()); 
props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName()); 
props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName()); 
StreamsConfig config = new StreamsConfig(props); 
TopologyBuilder builder = ... 
ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, builder); 

Вы можете покормить вход в топологии, как если бы вы на самом деле написано в одной из тем ввода:

driver.process("input-topic", "key1", "value1", strSerializer, strSerializer); 

И читать выходные темы:

ProducerRecord<String, String> record1 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer); 
ProducerRecord<String, String> record2 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer); 
ProducerRecord<String, String> record3 = driver.readOutput("output-topic-2", strDeserializer, strDeserializer); 

Тогда вы можете утверждать об этих результатах.

0

Вы можете просто запустить одного Zookeeper и брокера локально, чтобы протестировать приложение Kafka Streams.

Просто следуйте этому быстрый направляющий старт:

Также проверьте это Кафка Streams примеры (с подробной прогулкой по инструкции в JavaDocs):

+0

Да, это то, что я сейчас делаю – imehl

+0

Я вижу. Вы хотите сделать модульные тесты - это было непонятно из вашего вопроса ... Некоторые люди используют внутренние тестовые классы Kafka Streams - они официально не являются частью релиза, но вы можете просто получить от https://github.com/Apache/Kafka/дерево/ствол/потоки/SRC/тест/Java/орг/Apache/Кафка. Для шаблонов использования см. Интеграционные тесты Kafka Streams. Единственным недостатком является то, что эти классы могут меняться без уведомления (потому что они являются внутренними). Таким образом, ваша тестовая установка может сломаться, если позже вы обновите эти классы из github. Но до тех пор, пока вы останетесь с одним выпуском, если нужно работать правильно. –

1
  1. Как вы спрашиваете, если это возможно, чтобы проверить Кафка Streams приложения без реальной установки Кафки, вы можете попробовать эту библиотеку Передразнивало Streams в Scala. Mocked Streams 1.0 - это библиотека для Scala> = 2.11.8, которая позволяет вам тестировать топологию обработки приложений Kafka Streams (с Apache Kafka> = 0.10.1) без Zookeeper и Kafka Brokers. Ссылка: https://github.com/jpzk/mockedstreams

  2. Вы также можете использовать scalatest встраиваемый-Кафка, который представляет собой библиотеку, которая обеспечивает в памяти Кафка брокера для запуска ваших ScalaTest спецификации против. Он использует Kafka 0.10.1.1 и ZooKeeper 3.4.8.
    Ссылка: https://github.com/manub/scalatest-embedded-kafka#scalatest-embedded-kafka-streams

Успехов!

+0

спасибо, я посмотрю на это – imehl

0

Вы должны проверить комплект Kafka here.

Ваша установка тест должен выглядеть следующим образом:

KafkaUnit kafkaUnitServer = new KafkaUnit(); 
kafkaUnitServer.startup(); 
kafkaUnitServer.createTopic(testTopic); 
KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(testTopic, "key", "value"); 
kafkaUnitServer.sendMessages(keyedMessage); 

А потом читать ваши сообщения и утверждать, что все прошло нормально, вы делаете что-то вроде этого:

List<String> messages = kafkaUnitServer.readMessages(testTopic, 1); 

Это на самом деле раскручивает встроенная кафка, которая помогает вам иметь все, что вам нужно, в тесте.

Вы можете получить немного любителя и настроить ваш встроенный Кафка как setup() метода (или setupSpec() в Спке) и остановить вашу встроенную Кафка в teardown().

0

вы можете использовать https://github.com/jpzk/mockedstreams смотрите пример ниже ...

import com.madewithtea.mockedstreams.MockedStreams 

val input = Seq(("x", "v1"), ("y", "v2")) 
val exp = Seq(("x", "V1"), ("y", "V2")) 
val strings = Serdes.String() 

MockedStreams() 
    .topology { builder => builder.stream(...) [...] } 
    .input("topic-in", strings, strings, input) 
    .output("topic-out", strings, strings, exp.size) shouldEqual exp 

надеюсь, что это помогает ...