2016-08-29 8 views
0

Я использую Kafka consumer здесь (версия 1.3.1).python kafka: как сделать каждый msg потребляемым только один раз по группе с начала

То, что я для того чтобы достигнуть:

  • Есть 10 разделов. каждая секция начинается со смещения 0.

  • Существует группа потребителей (1,2,3, например).

  • Иногда один потребитель вниз или вверх.

  • Таким образом, члены группы могут измениться. Но я хочу, чтобы каждое сообщение в каждом разделе было потреблено группой только один раз (1 ИЛИ 2 ИЛИ 3).

Мои коды:

consumer = KafkaConsumer('my_topic', 
      bootstrap_servers=['ip:9092'], 
      auto_offset_reset='earliest', 
      max_partition_fetch_bytes=131072, 
      group_id='writer.test') 

ли вышеописанная конфигурация достаточно? Любые комментарии приветствуются. Благодаря

UPDATE

Я попытался следующие коды. Каждый раз в разделе 760 каждое сообщение может потребляться дважды двумя потребителями в одной группе. Зачем? Что-то не так?

def test(): 
    #PULL FROM KAFKA 
    consumer = KafkaConsumer(
      'topic', 
      bootstrap_servers=[ip], 
      auto_offset_reset='latest', 
      max_partition_fetch_bytes=131072, 
      auto_commit_interval_ms=500, 
      group_id='writer2.test') 

    print consumer.poll() 
    for i in range(10000): 
     msg = next(consumer) 
     if str(msg[1])=='670': 
      print 'partition= %s, offset= %s' % (msg[1], msg[2]) 
    consumer.unsubscribe() 


if __name__ == "__main__": 
    for i in range(10): 
     import time 
     time.sleep(5) 
     test() 

Выход 1:

{} 
partition= 670, offset= 224 
partition= 670, offset= 225 
partition= 670, offset= 226 
partition= 670, offset= 227 
partition= 670, offset= 228 
partition= 670, offset= 229 
partition= 670, offset= 230 
partition= 670, offset= 231 
partition= 670, offset= 232 
partition= 670, offset= 233 
partition= 670, offset= 234 
partition= 670, offset= 235 
partition= 670, offset= 236 
partition= 670, offset= 237 
partition= 670, offset= 238 
partition= 670, offset= 239 
partition= 670, offset= 240 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
{} 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
{} 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 

Выполнить тот же файл в другом окне, выход:

{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
{} 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
{} 
partition= 670, offset= 241 
partition= 670, offset= 242 
partition= 670, offset= 243 
partition= 670, offset= 244 
partition= 670, offset= 245 
partition= 670, offset= 246 
partition= 670, offset= 247 
partition= 670, offset= 248 
partition= 670, offset= 249 
partition= 670, offset= 250 
partition= 670, offset= 251 
partition= 670, offset= 252 
partition= 670, offset= 253 
partition= 670, offset= 254 
partition= 670, offset= 255 
partition= 670, offset= 256 
partition= 670, offset= 257 
partition= 670, offset= 258 
partition= 670, offset= 259 
+0

Вы используете единый брокер или несколько групп брокеров? –

+0

@MoinuddinQuadri несколько кластер брокеров. Любая разница? – BAE

+0

Я считаю, что ваша проблема в том, что у вас много потребителей. И данные, которые вы передаете, были получены всеми потребителями. Но вы хотите, чтобы его потребляли только один потребитель. Я прав? –

ответ

0

При использовании групп потребителей, Кафка обеспечивает, по-крайней мере, один раз гарантий доставки , таким образом, при отказе потребителя от повторного назначения этих разделов потребителя некоторые сообщения могут быть доставлены во второй раз.

Если вы хотите удостовериться, что сообщение не обрабатывается дважды, вы можете переключить свой шаблон на максимальные гарантии доставки. Однако для этого вы можете потерять некоторые сообщения (т. Е. Никогда не обрабатываться) в случае сбоя.

Чтобы включить максимально-один раз, вам необходимо отключить автоматическую фиксацию и выполнить вручную directy после poll, т.е. перед тем, как начать обработку сообщений, полученных через poll.

Для получения более подробной информации см. http://docs.confluent.io/3.0.0/clients/consumer.html#detailed-examples (даже если примеры не находятся в Python, общий шаблон одинаков).