2016-12-07 3 views
5

Я использую KafkaConsumer 0.10 Java api. Я хочу потреблять из определенного раздела и определенного смещения. Я поднял глаза и обнаружил, что существует метод поиска, но он бросает исключение. У кого-нибудь был похожий вариант или решение?KafkaConsumer 0.10 Сообщение об ошибке Java API: нет текущего назначения для раздела

Код:

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps); 
consumer.seek(new TopicPartition("mytopic", 1), 4); 

Исключение

java.lang.IllegalStateException: No current assignment for partition mytopic-1 
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) 
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135) 
    at xx.xxx.xxx.Test.main(Test.java:182) 

ответ

18

Перед тем, как можно seek() сначала нужно subscribe() к теме илиassign() раздела топика к потребителю. Также имейте в виду, что subscribe() и assign() ленивы - таким образом, вы также должны выполнить «фиктивный вызов» до poll(), прежде чем сможете использовать seek().

Если вы используете subscribe(), вы используете групповое управление: таким образом, вы можете запускать несколько пользователей, используя один и тот же group.id, и все разделы раздела будут распределены равномерно по всем потребителям в группе автоматически (каждый раздел будет присвоен один потребитель в группе).

Если вы хотите прочитать определенные разделы, вам необходимо использовать назначение вручную через assign(). Это позволяет выполнять любое задание.

Btw: KafkaConsumer имеет очень длинный подробный класс JavaDoc, включая примеры. Это стоит прочитать.

+0

Спасибо. Это сработало :) с комбинацией assign() и seek() – colossal

+1

Я думаю, вы имеете в виду 'group.id' вместо' application.id' – automaticgiant

+0

Ответ на слишком много вопросов #KafkaStream здесь ... Спасибо за указание @automaticgiant –

1

Если вы не хотите использовать poll() и извлекаете записи карты и сами изменяете смещение. Кафка версия 0,11 Попробуйте это:

... 
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);  
consumer.subscribe(Arrays.asList("Test_topic1", "Test_topic2")); 
List<TopicPartition> partitions =consumer.partitionsFor("Test_topic1").stream().map(part->{TopicPartition tp = new TopicPartition(part.topic(),part.partition()); return tp;}).collect(Collectors.toList()); 
Field coordinatorField = consumer.getClass().getDeclaredField("coordinator"); 
coordinatorField.setAccessible(true);  

ConsumerCoordinator coordinator = (ConsumerCoordinator)coordinatorField.get(consumer); 
coordinator.poll(new Date().getTime(), 1000);//Watch out for your local date and time settings 
consumer.seekToBeginning(partitions); //or other seek 

Опрос для координатора мероприятий. Это гарантирует, что координатор известен и что потребитель присоединился к группе (если он использует управление группами). Это также обрабатывает периодические смещения, если они включены.

 Смежные вопросы

  • Нет связанных вопросов^_^