В рамках нашего текущего кластера Kafka выполняется тестирование высокой доступности (HA). Цель состоит в том, что в то время как задание производителя подталкивает данные к определенному разделу темы, все брокеров в кластере Kafka перезапускаются последовательно (Stop-first broker-restart it и после того, как первый брокер подходит, выполняет те же действия для второго брокера и скоро). Задание продюсера составляет около 7 миллионов записей в течение примерно 30 минут, пока этот тест продолжается. В конце работы было замечено, что около 1000 записей отсутствуют.Повторный перезапуск Kafka: данные теряются
Ниже приведены особенности нашего Кафки кластера: (kafka_2.10-0.8.2.0)
-3 Кафка брокеров каждый с 2 100GB монтирует
Тема была создана с: -Replication фактор 3 -min.insync.replica = 2
server.properties:
broker.id=1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/drive1,/drive2
num.partitions=1
num.recovery.threads.per.data.dir=1
log.flush.interval.messages=10000
log.retention.hours=1
log.segment.bytes=1073741824
log.retention.check.interval.ms=1800000
log.cleaner.enable=false
zookeeper.connect=ZK1:2181,ZK2:2181,ZK3:2181
zookeeper.connection.timeout.ms=10000
advertised.host.name=XXXX
auto.leader.rebalance.enable=true
auto.create.topics.enable=false
queued.max.requests=500
delete.topic.enable=true
controlled.shutdown.enable=true
unclean.leader.election=false
num.replica.fetchers=4
controller.message.queue.size=10
Producer.properties (aync производителя с новым продюсером API)
bootstrap.servers=broker1:9092,broker2:9092,broker3:9092
acks=all
buffer.memory=33554432
compression.type=snappy
batch.size=32768
linger.ms=5
max.request.size=1048576
block.on.buffer.full=true
reconnect.backoff.ms=10
retry.backoff.ms=100
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
Может кто-то поделиться какой-либо информацией о Кафке кластере и HA, чтобы гарантировать, что данные не будут потеряны при прокатке перезапуска Кафки брокеров?
Кроме того, вот мой код производителя. Это огонь и забудьте о продюсере. мы не обрабатываем неудачи явно на данный момент. Работает отлично для почти миллионов записей. Я вижу проблему, только когда брокеров Kafka перезапускают, как объяснялось выше.
public void sendMessage(List<byte[]> messages, String destination, Integer parition, String kafkaDBKey) {
for(byte[] message : messages) {
producer.send(new ProducerRecord<byte[], byte[]>(destination, parition, kafkaDBKey.getBytes(), message));
}
}
Как долго вы проводили тест в общей сложности? Может быть, некоторые сообщения были удалены из-за сохранения? 'log.retention.check' установлен в 30 минут и' log.retention.hours' до 1 часа. – alpe1
Я проработал 30 минут. Продолжительность хранения составляет около 4 часов. Я не вижу, чтобы данные удалялись. Я подтвердил это, проверив самые ранние и последние смещения этого раздела. –
Как ваш производитель справляется с ошибками? Можно ли разместить его код? – nelsonda