Я использую HighlevelProducer и HighlevelConsumer для отправки и получения сообщений. HighlevelConsumer настроен с autoCommit = false, поскольку я хочу фиксировать сообщения только тогда, когда он был успешно создан. Проблема состоит в том, что первое сообщение никогда не совершается.Неверный заказ при использовании autoCommit = false в HighlevelConsumer
Пример:
- Отправка сообщений 1-10.
- Получить сообщение 1
- Прием сообщений 2
- Commit Сообщение 2
- ...
- Получить сообщение 10
- Commit сообщение 10
- Commit Сообщение 1
Если я перезагрузите моего Потребителя, все сообщения от 1 до 10 обрабатываются снова. Только если я отправлю новые сообщения потребителю, старые сообщения будут зафиксированы. Это происходит для любого количества сообщений.
Мой код выглядит следующим образом:
var kafka = require('kafka-node'),
HighLevelConsumer = kafka.HighLevelConsumer,
client = new kafka.Client("localhost:2181/");
consumer = new HighLevelConsumer(
client,
[
{ topic: 'mytopic' }
],
{
groupId: 'my-group',
id: "my-consumer-1",
autoCommit: false
}
);
consumer.on('message', function (message) {
console.log("consume: " + message.offset);
consumer.commit(function (err, data) {
console.log("commited:" + message.offset);
});
console.log("consumed:" + message.offset);
});
process.on('SIGINT', function() {
consumer.close(true, function() {
process.exit();
});
});
process.on('exit', function() {
consumer.close(true, function() {
process.exit();
});
});
var messages = 10;
var kafka = require('kafka-node'),
HighLevelProducer = kafka.HighLevelProducer,
client = new kafka.Client("localhost:2181/");
var producer = new HighLevelProducer(client, { partitionerType: 2, requireAcks: 1 });
producer.on('error', function (err) { console.log(err) });
producer.on('ready', function() {
for (i = 0; i < messages; i++) {
payloads = [{ topic: 'mytopic', messages: "" }];
producer.send(payloads, function (err, data) {
err ? console.log(i + "err", err) : console.log(i + "data", data);
});
}
});
Я делаю что-то неправильно, или это ошибка в Кафка-узле?
Возможный дубликат [Почему commitAsync не может выполнить первые 2 смещения] (http://stackoverflow.com/questions/37794718/why-commitasync-fails-to-commit-the-first-2-offsets) –