Я использую платформу confluent-3.0.1 и создаю разъем Kafka-Elasticsearch. Для этого я использую SinkConnector и SinkTask (API-интерфейс Kafka-connect) для получения данных от Kafka.Как установить max.poll.records в API-интерфейсе Kafka-Connect платформы Confluent
В качестве части этого кода я расширяю метод TaskConfigs от SinkConnector, чтобы вернуть «max.poll.records» для извлечения только 100 записей за раз. Но он не работает, и я получаю все записи одновременно, и я не могу совершать смещения в течение установленного времени. Пожалуйста, может ли один помочь мне настроить «max.poll.records»
public List<Map<String, String>> taskConfigs(int maxTasks) {
ArrayList<Map<String, String>> configs = new ArrayList<Map<String, String>>();
for (int i = 0; i < maxTasks; i++) {
Map<String, String> config = new HashMap<String, String>();
config.put(ConfigurationConstants.CLUSTER_NAME, clusterName);
config.put(ConfigurationConstants.HOSTS, hosts);
config.put(ConfigurationConstants.BULK_SIZE, bulkSize);
config.put(ConfigurationConstants.IDS, elasticSearchIds);
config.put(ConfigurationConstants.TOPICS_SATELLITE_DATA, topics);
config.put(ConfigurationConstants.PUBLISH_TOPIC, topicTopublish);
config.put(ConfigurationConstants.TYPES, elasticSearchTypes);
config.put("max.poll.records", "100");
configs.add(config);
}
return configs;
}
BTW, Confluent 3.1 (выпущенный сегодня) включает в себя соединительный разъем Elasticsearch, в случае, если это отвечает вашим потребностям. http://docs.confluent.io/3.1.0/connect/connect-elasticsearch/docs/index.html – shikhar