2016-06-15 1 views
0

Мы пытаемся получить сообщения в kafka с начала, используя STORM VERSION 0.9.3, и мы можем поместить его в HBASE TABLE.Невозможно получить сообщения в STORM NEW VERSION (1.0.1) из BEGINNING

Для этого конфигурация которой мы использовали:

kafkaConfig.forceFromStart = true; 

Таким образом, мы получали сообщения от OFFSET 0 т.е. запуск в HBase таблице т.е. полных сообщений.

Но когда мы пытаемся получить сообщения от начала в kafka, используя STORM VERSION 1.0.1 и пытаясь поместить его в HBASE TABLE, мы получаем только последние сообщения. Мы не получаем сообщение с самого начала, это OFFSET 0 (то есть последнее добавленное сообщение было доступно, но не с самого начала).

Конфигурация, которую мы использовали:

kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
kafkaConfig.useStartOffsetTimeIfOffsetOutOfRange = true; 
kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); 
kafkaConfig.ignoreZkOffsets = false; 
kafkaConfig.maxOffsetBehind = Long.MAX_VALUE; 
kafkaConfig.startOffsetTime = -2; 

Любая помощь приветствуется.

ответ

0

Вам нужно будет установить для ignoreZkOffsets значение true, если вы хотите заставить пользователя использовать указанное смещение вместо чтения его из Zookeeper.

Из документации storm-kafka:

Это означает, что, когда топология выполняется один раз заходящего KafkaConfig.startOffsetTime не будет иметь эффект для последующих пробегов топологии, потому что теперь топология будет опираться на информация о состоянии потребителя (смещения) в ZooKeeper для определения от , где оно должно начинаться (точнее: возобновить). Если вы хотите, чтобы заставлял носик игнорировать информацию о состоянии потребителя, хранящуюся в ZooKeeper, тогда вы должны установить параметр . KafkaConfig.ignoreZkOffsets - true. Если это правда, то носик всегда будет начнет отсчет со смещения, определенного KafkaConfig.startOffsetTime , как описано выше.

Таким образом, для потребления сообщений от начала очереди, используйте расположенную ниже конфигурацию:

kafkaConfig.ignoreZkOffsets = true; 
kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); 
+0

спасибо за Ваш ответ. –

+0

Но мы уже установили эту конфигурацию в нашем коде, а также попытались установить kafkaConfig.startOffsetTime = -2; .. мы можем читать все сообщения из kafka, но при вставке в таблицу hbase запускается только последний msg ... любая помощь.. –

 Смежные вопросы

  • Нет связанных вопросов^_^