0

Я использую платформу confluent-3.0.1 и создаю разъем Kafka-Elasticsearch. Для этого я использую SinkConnector и SinkTask (API-интерфейс Kafka-connect) для получения данных от Kafka.Как установить max.poll.records в API-интерфейсе Kafka-Connect платформы Confluent

В качестве части этого кода я расширяю метод TaskConfigs от SinkConnector, чтобы вернуть «max.poll.records» для извлечения только 100 записей за раз. Но он не работает, и я получаю все записи одновременно, и я не могу совершать смещения в течение установленного времени. Пожалуйста, может ли один помочь мне настроить «max.poll.records»

public List<Map<String, String>> taskConfigs(int maxTasks) { 
    ArrayList<Map<String, String>> configs = new ArrayList<Map<String, String>>(); 
    for (int i = 0; i < maxTasks; i++) { 
     Map<String, String> config = new HashMap<String, String>(); 
     config.put(ConfigurationConstants.CLUSTER_NAME, clusterName); 
     config.put(ConfigurationConstants.HOSTS, hosts); 
     config.put(ConfigurationConstants.BULK_SIZE, bulkSize); 
     config.put(ConfigurationConstants.IDS, elasticSearchIds); 
     config.put(ConfigurationConstants.TOPICS_SATELLITE_DATA, topics); 
     config.put(ConfigurationConstants.PUBLISH_TOPIC, topicTopublish); 
     config.put(ConfigurationConstants.TYPES, elasticSearchTypes); 
     config.put("max.poll.records", "100"); 

     configs.add(config); 
    } 
    return configs; 
    } 
+0

BTW, Confluent 3.1 (выпущенный сегодня) включает в себя соединительный разъем Elasticsearch, в случае, если это отвечает вашим потребностям. http://docs.confluent.io/3.1.0/connect/connect-elasticsearch/docs/index.html – shikhar

ответ

3

Вы не можете переопределить большинство Кафку потребителей конфиги как max.poll.records в конфигурации соединителя. Это можно сделать в конфигурации Connect Connect, но с префиксом consumer..

+0

Создаю файл worker.properties и предоставил указанное свойство в файле свойств и выполнил команду ниже. sh ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-elasticsearch/worker.properties ./etc/kafka-connect-elasticsearch/connector.properties> connectorlogs.log Но получение исключения. org.apache.kafka.common.config.ConfigException: Отсутствует требуемая конфигурация «connector.class», которая не имеет значения по умолчанию. – Renukaradhya

+0

my worker.properties содержит "group.id" = operative1. «operative1.max.poll.records» = 1000, а my connector.properties содержит правильный «connector.class», но все еще получает эту ошибку. – Renukaradhya

+0

вам нужно 'customer.max.poll.records = 1000' в настройке рабочего пользователя – shikhar

0

Было решено. Я добавил ниже конфигурации в connect-avro-standalone.properties

group.id=mygroup 
consumer.max.poll.records=1000 

и побежал ниже команду для запуска моего разъема.

sh ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-elasticsearch/connect-elasticsearch-sink.properties