Сообщество, не могли бы вы помочь мне понять, почему ~ 3% моих сообщений не заканчивается HDFS
? Я написал простого продюсера в JAVA
, чтобы создать 10 миллионов сообщений.Конфликтный HDFS-коннектор теряет сообщения
public static final String TEST_SCHEMA = "{"
+ "\"type\":\"record\","
+ "\"name\":\"myrecord\","
+ "\"fields\":["
+ " { \"name\":\"str1\", \"type\":\"string\" },"
+ " { \"name\":\"str2\", \"type\":\"string\" },"
+ " { \"name\":\"int1\", \"type\":\"int\" }"
+ "]}";
public KafkaProducerWrapper(String topic) throws UnknownHostException {
// store topic name
this.topic = topic;
// initialize kafka producer
Properties config = new Properties();
config.put("client.id", InetAddress.getLocalHost().getHostName());
config.put("bootstrap.servers", "myserver-1:9092");
config.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
config.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
config.put("schema.registry.url", "http://myserver-1:8089");
config.put("acks", "all");
producer = new KafkaProducer(config);
// parse schema
Schema.Parser parser = new Schema.Parser();
schema = parser.parse(TEST_SCHEMA);
}
public void send() {
// generate key
int key = (int) (Math.random() * 20);
// generate record
GenericData.Record r = new GenericData.Record(schema);
r.put("str1", "text" + key);
r.put("str2", "text2" + key);
r.put("int1", key);
final ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topic, "K" + key, (GenericRecord) r);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
logger.error("Send failed for record {}", record, e);
messageErrorCounter++;
return;
}
logger.debug("Send succeeded for record {}", record);
messageCounter++;
}
});
}
public String getStats() { return "Messages sent: " + messageCounter + "/" + messageErrorCounter; }
public long getMessageCounter() {
return messageCounter + messageErrorCounter;
}
public void close() {
producer.close();
}
public static void main(String[] args) throws InterruptedException, UnknownHostException {
// initialize kafka producer
KafkaProducerWrapper kafkaProducerWrapper = new KafkaProducerWrapper("my-test-topic");
long max = 10000000L;
for (long i = 0; i < max; i++) {
kafkaProducerWrapper.send();
}
logger.info("producer-demo sent all messages");
while (kafkaProducerWrapper.getMessageCounter() < max)
{
logger.info(kafkaProducerWrapper.getStats());
Thread.sleep(2000);
}
logger.info(kafkaProducerWrapper.getStats());
kafkaProducerWrapper.close();
}
И я использую режим автономного Confluent HDFS Connector
в для записи данных в HDFS
. Конфигурация выглядит следующим образом:
name=hdfs-consumer-test
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=my-test-topic
hdfs.url=hdfs://my-cluster/kafka-test
hadoop.conf.dir=/etc/hadoop/conf/
flush.size=100000
rotate.interval.ms=20000
# increase timeouts to avoid CommitFailedException
consumer.session.timeout.ms=300000
consumer.request.timeout.ms=310000
heartbeat.interval.ms= 60000
session.timeout.ms= 100000
разъем записывает данные в HDFS, но после ожидания в течение 20000 мс (из-за rotate.interval.ms
) не все сообщения принимаются.
scala> spark.read.avro("/kafka-test/topics/my-test-topic/partition=*/my-test-topic*")
.count()
res0: Long = 9749015
Любая идея, в чем причина такого поведения? Где моя ошибка? Я использую Confluent 3.0.1/Kafka 10.0.0.1.
Вы видите, что последние несколько сообщений не перемещаются в HDFS? Если это так, вероятно, вы столкнулись с проблемой, описанной здесь. Https://github.com/confluentinc/kafka-connect-hdfs/pull/100 Попробуйте отправить еще одно сообщение в тему после того, как rotate.interval.ms имеет expired, чтобы проверить, что это то, с чем вы работаете. Если вам нужно повернуть в зависимости от времени, вероятно, это хорошая идея для обновления, чтобы забрать исправление. – dawsaw
Это решение! Я обновился до ** Confluent 3.1.1 **, и я могу видеть все сообщения в HDFS. Вы хотите написать это как ответ, и я даю вам то преимущество, которого вы заслуживаете? –
Да, конечно, не знал, что на самом деле была разница :) – dawsaw