2015-09-28 6 views
0

В рамках нашего текущего кластера Kafka выполняется тестирование высокой доступности (HA). Цель состоит в том, что в то время как задание производителя подталкивает данные к определенному разделу темы, все брокеров в кластере Kafka перезапускаются последовательно (Stop-first broker-restart it и после того, как первый брокер подходит, выполняет те же действия для второго брокера и скоро). Задание продюсера составляет около 7 миллионов записей в течение примерно 30 минут, пока этот тест продолжается. В конце работы было замечено, что около 1000 записей отсутствуют.Повторный перезапуск Kafka: данные теряются

Ниже приведены особенности нашего Кафки кластера: (kafka_2.10-0.8.2.0)

-3 Кафка брокеров каждый с 2 ​​100GB монтирует

Тема была создана с: -Replication фактор 3 -min.insync.replica = 2

server.properties:

broker.id=1 
num.network.threads=3 
num.io.threads=8 
socket.send.buffer.bytes=1048576 
socket.receive.buffer.bytes=1048576 
socket.request.max.bytes=104857600 
log.dirs=/drive1,/drive2 
num.partitions=1 
num.recovery.threads.per.data.dir=1 
log.flush.interval.messages=10000 
log.retention.hours=1 
log.segment.bytes=1073741824 
log.retention.check.interval.ms=1800000 
log.cleaner.enable=false 
zookeeper.connect=ZK1:2181,ZK2:2181,ZK3:2181 
zookeeper.connection.timeout.ms=10000 
advertised.host.name=XXXX 
auto.leader.rebalance.enable=true 
auto.create.topics.enable=false 
queued.max.requests=500 
delete.topic.enable=true 
controlled.shutdown.enable=true 
unclean.leader.election=false 
num.replica.fetchers=4 
controller.message.queue.size=10 

Producer.properties (aync производителя с новым продюсером API)

bootstrap.servers=broker1:9092,broker2:9092,broker3:9092 
acks=all 
buffer.memory=33554432 
compression.type=snappy 
batch.size=32768 
linger.ms=5 
max.request.size=1048576 
block.on.buffer.full=true 
reconnect.backoff.ms=10 
retry.backoff.ms=100 
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 

Может кто-то поделиться какой-либо информацией о Кафке кластере и HA, чтобы гарантировать, что данные не будут потеряны при прокатке перезапуска Кафки брокеров?

Кроме того, вот мой код производителя. Это огонь и забудьте о продюсере. мы не обрабатываем неудачи явно на данный момент. Работает отлично для почти миллионов записей. Я вижу проблему, только когда брокеров Kafka перезапускают, как объяснялось выше.

public void sendMessage(List<byte[]> messages, String destination, Integer parition, String kafkaDBKey) { 

    for(byte[] message : messages) { 

     producer.send(new ProducerRecord<byte[], byte[]>(destination, parition, kafkaDBKey.getBytes(), message)); 

    } 

} 
+0

Как долго вы проводили тест в общей сложности? Может быть, некоторые сообщения были удалены из-за сохранения? 'log.retention.check' установлен в 30 минут и' log.retention.hours' до 1 часа. – alpe1

+0

Я проработал 30 минут. Продолжительность хранения составляет около 4 часов. Я не вижу, чтобы данные удалялись. Я подтвердил это, проверив самые ранние и последние смещения этого раздела. –

+0

Как ваш производитель справляется с ошибками? Можно ли разместить его код? – nelsonda

ответ

1

Увеличивая значение повторных попыток по умолчанию от 0 до 4000 на стороне производителя, мы можем успешно передавать данные без потери. retries=4000

  • Благодаря этой настройке, есть возможность отправки же сообщение дважды и сообщений из последовательности по времени потребитель получает его (второй тзд может достигнуть до первого сообщ). Но для нашей текущей проблемы это не проблема и обрабатывается на стороне потребителя, чтобы все было в порядке.

 Смежные вопросы

  • Нет связанных вопросов^_^