2017-02-19 16 views
2

Работает нормально, если исходный раздел разделов = 1. Если я разбиваю разделы на любое значение> 1, я вижу ошибку ниже. Применимо как к низкому уровню, так и к API DSL. Любые указатели? Что может быть пропало?Не удалось перезагрузить ошибку в потоках Kafka с более чем одним разделом тем

org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-1] Failed to rebalance 
     at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:410) 
     at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) 

Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_1] Store in-memory-avg-store's change log (cpu-streamz-in-memory-avg-store-changelog) does not contain partition 1 
     at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:185) 
     at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123) 
     at org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier$MemoryStore.init(InMemoryKeyValueStoreSupplier.java:102) 
     at org.apache.kafka.streams.state.internals.InMemoryKeyValueLoggedStore.init(InMemoryKeyValueLoggedStore.java:56) 
     at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85) 
     at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81) 
     at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:119) 
     at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633) 
     at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660) 
     at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69) 
     at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124) 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228) 
     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313) 
     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277) 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259) 
     at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) 
     at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) 
     at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407) 

ответ

3

Это операционная проблема. Потоки Kafka не позволяют изменять количество разделов входных тем в течение «жизненного цикла».

Если вы остановите запуск приложения Kafka Streams, измените количество разделов раздела ввода и перезапустите приложение, оно сломается (с ошибкой, которую вы видите выше). Трудно исправить это для производственных случаев, и настоятельно рекомендуется, чтобы не изменил количество разделов раздела ввода (см. Комментарий ниже). Для POC/demos это не сложно исправить.

Для того, чтобы исправить это, вы должны сбросить приложения с помощью инструмента перезагрузки приложения Кафки:

Используя инструмент сброса приложений, имеет тот недостаток, что вы вытирать из всего состояния вашей заявки. Таким образом, чтобы ваше приложение было в том же состоянии, что и раньше, вам нужно переработать весь вводный текст с начала. Это, конечно, возможно только в том случае, если все входные данные все еще доступны, и ничто не было удалено брокерами, которые применяют политику сохранения времени/размера темы.

Кроме того, следует отметить, что добавление разделов к темам ввода изменяет схему разбиения на разделы (по умолчанию используется хэш-разделение по ключу). Поскольку Kafka Streams предполагает, что темы ввода правильно разделены ключом, если вы используете инструмент сброса и обрабатываете все данные, вы можете получить неправильный результат, поскольку «старые» данные разделяются иначе, чем «новые» данные (т. Е. Данные, записанные после добавления новые разделы). Для производственных случаев вам нужно будет прочитать все данные из вашей исходной темы и записать их в новую тему (с увеличением количества разделов), чтобы правильно распределить ваши данные (или, конечно, этот шаг может изменить порядок записей с разными ключи - что не должно быть проблемой обычно - просто хотел бы упомянуть об этом). Впоследствии вы можете использовать новую тему в качестве темы ввода для своего приложения Streams.

Этот шаг перераспределения также может быть легко выполнен в рамках приложения Streams с помощью оператора through("new_topic_with_more_partitions") непосредственно после прочтения исходной темы и перед выполнением любой реальной обработки.

В целом, однако, рекомендуется перераспределить свои темы для производственных случаев, так что вам больше не нужно будет менять количество разделов позже. Накладные расходы на перегородки довольно малы и сэкономит вам много хлопот позже. Это общая рекомендация, если вы работаете с Kafka - это не ограничивается случаями использования Streams. более

Одно замечание:

Некоторые люди могли бы предложить, чтобы увеличить число разделов Кафки Streams внутренние темы вручную. Во-первых, это будет взломать и не рекомендуется по определенным причинам.

  1. Возможно, сложно определить, что такое правильный номер, поскольку это зависит от различных факторов (так как это внутренняя деталь реализации Stream).
  2. Вы также сталкиваетесь с проблемой взлома схемы разбиения, как описано в параграфе выше. Таким образом, вы, скорее всего, оказываетесь в несогласованном состоянии.

Во избежание несогласованного состояния приложения потоки не удаляют внутренние темы или автоматически не изменяют количество разделов внутренних тем, но с сообщением об ошибке, о котором вы сообщили. Это гарантирует, что пользователь осознает все последствия, выполнив «очистку» вручную.

Btw: Для предстоящей Кафки 0.10.2 это сообщение об ошибке получил улучшилось: https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L100-L103