2016-10-20 7 views
1

Я использую HighlevelProducer и HighlevelConsumer для отправки и получения сообщений. HighlevelConsumer настроен с autoCommit = false, поскольку я хочу фиксировать сообщения только тогда, когда он был успешно создан. Проблема состоит в том, что первое сообщение никогда не совершается.Неверный заказ при использовании autoCommit = false в HighlevelConsumer

Пример:

  • Отправка сообщений 1-10.
  • Получить сообщение 1
  • Прием сообщений 2
  • Commit Сообщение 2
  • ...
  • Получить сообщение 10
  • Commit сообщение 10
  • Commit Сообщение 1

Если я перезагрузите моего Потребителя, все сообщения от 1 до 10 обрабатываются снова. Только если я отправлю новые сообщения потребителю, старые сообщения будут зафиксированы. Это происходит для любого количества сообщений.

Мой код выглядит следующим образом:

var kafka = require('kafka-node'), 
    HighLevelConsumer = kafka.HighLevelConsumer, 
    client = new kafka.Client("localhost:2181/"); 
consumer = new HighLevelConsumer(
    client, 
    [ 
     { topic: 'mytopic' } 
    ], 
    { 
     groupId: 'my-group', 
     id: "my-consumer-1", 
     autoCommit: false 
    } 
); 

consumer.on('message', function (message) { 
    console.log("consume: " + message.offset); 
    consumer.commit(function (err, data) { 
     console.log("commited:" + message.offset); 
    }); 
    console.log("consumed:" + message.offset); 
}); 

process.on('SIGINT', function() { 
    consumer.close(true, function() { 
     process.exit(); 
    }); 
}); 

process.on('exit', function() { 
    consumer.close(true, function() { 
     process.exit(); 
    }); 
}); 
var messages = 10; 
var kafka = require('kafka-node'), 
    HighLevelProducer = kafka.HighLevelProducer, 
    client = new kafka.Client("localhost:2181/"); 
var producer = new HighLevelProducer(client, { partitionerType: 2, requireAcks: 1 }); 

producer.on('error', function (err) { console.log(err) }); 
producer.on('ready', function() { 
    for (i = 0; i < messages; i++) { 
     payloads = [{ topic: 'mytopic', messages: "" }]; 
     producer.send(payloads, function (err, data) { 
      err ? console.log(i + "err", err) : console.log(i + "data", data); 
     }); 
    } 
}); 

Я делаю что-то неправильно, или это ошибка в Кафка-узле?

+0

Возможный дубликат [Почему commitAsync не может выполнить первые 2 смещения] (http://stackoverflow.com/questions/37794718/why-commitasync-fails-to-commit-the-first-2-offsets) –

ответ

0

Коммит сообщени 2 неявную фиксацию сообщения 1.

Как вы фиксации делаются асинхронно, и фиксацией сообщения 1 и 2 сообщений выполняются быстро друг за другом (то есть, совершение 2 происходит до потребитель отправил коммит 1), первая фиксация не будет выполняться явно, и будет отправлено только одно сообщение сообщения 2.