6

Для потоков Kafka, если мы используем API-интерфейс более низкого уровня, мы можем управлять фиксацией или нет. Поэтому, если в нашем коде возникают проблемы, и мы не хотим передавать это сообщение. В этом случае Kafka будет повторно обновлять это сообщение несколько раз, пока проблема не будет исправлена.Как обрабатывать ошибки и не совершать при использовании Kafka Streams DSL

Но как контролировать, передавать ли сообщение при использовании своего DSL-интерфейса более высокого уровня?

Ресурсы:

http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html

+0

Документы, на которые вы ссылаетесь, являются довольно старыми и устаревшими. Лучше использовать http://docs.confluent.io/current/streams/index.html –

ответ

8

Ваше утверждение не совсем верно. Вы не можете «управлять фиксацией или нет» - по крайней мере, не напрямую (ни в API-интерфейсе процессора, ни в DSL). Вы можете использовать только ProcessorContext#commit() для запроса дополнительных коммитов. Таким образом, после вызова #commit() потоки пытаются совершить как можно скорее, но это не немедленная фиксация. Кроме того, потоки будут фиксироваться автоматически, даже если вы никогда не звоните #commit(). Вы можете контролировать потоки совершают интервал через Streams конфигурации commit.interval.m (см http://docs.confluent.io/current/streams/developer-guide.html#configuring-a-kafka-streams-application)

В случае «проблемы», это зависит от типа проблемы вы имеете, как ответить на него:

  • , если вы обнаружите проблему, от которой невозможно восстановить, вы можете только выбросить исключение и «остановить мир» (см. ниже).
  • :
  • Если у вас есть исправляемая ошибка, вам необходимо «закодировать» внутри вашего собственного кода (например, в пределах Processor#process() или KeyValueMapper#apply(), пока проблема не будет решена, и вы сможете успешно обработать текущее сообщение (обратите внимание, что вы можете запустить таймаут , то есть исключение, используя эту стратегию - ср потребительских heartbeat.interval.ms и изменение настроек для 0.10.1 session.timeout.ms[KIP-62])
  • альтернатива была бы, чтобы положить записи, которые не могут быть обработаны прямо сейчас в StateStore и обработать их позже, однако. , трудно получить право, а также прервать несколько предположений Streams (например, порядок обработки). Не рекомендуется использовать, и если они используются, вы должны быть очень осторожны в отношении последствий.

Если нет неперехваченное исключение, StreamThread умрёт и не совершить случается (вы можете зарегистрировать обработчик исключений, чтобы получить уведомление об этом: http://docs.confluent.io/current/streams/developer-guide.html#using-kafka-streams-within-your-application-code. Если все вы StreamThread умерли, вам нужно будет создать новый экземпляр KafkaStreams, чтобы перезагрузить приложение.

Вы не должны возвращаться из кода пользователя до того, как сообщение успешно обработано, потому что если вы вернетесь, Streams предполагает, что сообщение успешно обработано (и, таким образом, может зафиксировать соответствующее смещение). Что касается маркерной точки (3), то запись в специальный StateStore для последующей обработки считается «успешно» обработанной записью.

+0

Большое спасибо, Маттиас. Ваши комментарии заставили меня больше узнать о внутренностях Кафки. И для восстановимой ошибки (например, при записи данных на целевой сервер, который в это время недоступен), мы выбираем повторить попытку навсегда, пока он не будет исправлен. Это блокирует потребителя, но в нашем случае это нормально. –

+0

Не могли бы вы предоставить ссылку на некоторую документацию о том, как KafkaStreams обрабатывает смещение.Похоже, вы говорите, что смещение не будет зафиксировано до тех пор, пока код пользователя не закончится и не будет зафиксирован, если код пользователя выдает ошибку. Это верно? –

+0

То, что вы говорите, правильно. К сожалению, нет доступной документации (только код ... если вы хотите вникнуть в это, проверьте метод StreamsThread # maybeCommit() '). –