2016-11-30 6 views
1

Сообщество, не могли бы вы помочь мне понять, почему ~ 3% моих сообщений не заканчивается HDFS? Я написал простого продюсера в JAVA, чтобы создать 10 миллионов сообщений.Конфликтный HDFS-коннектор теряет сообщения

public static final String TEST_SCHEMA = "{" 
     + "\"type\":\"record\"," 
     + "\"name\":\"myrecord\"," 
     + "\"fields\":[" 
     + " { \"name\":\"str1\", \"type\":\"string\" }," 
     + " { \"name\":\"str2\", \"type\":\"string\" }," 
     + " { \"name\":\"int1\", \"type\":\"int\" }" 
     + "]}"; 

public KafkaProducerWrapper(String topic) throws UnknownHostException { 
    // store topic name 
    this.topic = topic; 

    // initialize kafka producer 
    Properties config = new Properties(); 
    config.put("client.id", InetAddress.getLocalHost().getHostName()); 
    config.put("bootstrap.servers", "myserver-1:9092"); 
    config.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); 
    config.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); 
    config.put("schema.registry.url", "http://myserver-1:8089"); 
    config.put("acks", "all"); 

    producer = new KafkaProducer(config); 

    // parse schema 
    Schema.Parser parser = new Schema.Parser(); 
    schema = parser.parse(TEST_SCHEMA); 
} 

public void send() { 
    // generate key 
    int key = (int) (Math.random() * 20); 

    // generate record 
    GenericData.Record r = new GenericData.Record(schema); 
    r.put("str1", "text" + key); 
    r.put("str2", "text2" + key); 
    r.put("int1", key); 

    final ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topic, "K" + key, (GenericRecord) r); 
    producer.send(record, new Callback() { 
     public void onCompletion(RecordMetadata metadata, Exception e) { 
      if (e != null) { 
       logger.error("Send failed for record {}", record, e); 
       messageErrorCounter++; 
       return; 
      } 
      logger.debug("Send succeeded for record {}", record); 
      messageCounter++; 
     } 
    }); 
} 

public String getStats() { return "Messages sent: " + messageCounter + "/" + messageErrorCounter; } 

public long getMessageCounter() { 
    return messageCounter + messageErrorCounter; 
} 

public void close() { 
    producer.close(); 
} 

public static void main(String[] args) throws InterruptedException, UnknownHostException { 
    // initialize kafka producer 
    KafkaProducerWrapper kafkaProducerWrapper = new KafkaProducerWrapper("my-test-topic"); 

    long max = 10000000L; 
    for (long i = 0; i < max; i++) { 
     kafkaProducerWrapper.send(); 
    } 

    logger.info("producer-demo sent all messages"); 
    while (kafkaProducerWrapper.getMessageCounter() < max) 
    { 
     logger.info(kafkaProducerWrapper.getStats()); 
     Thread.sleep(2000); 
    } 

    logger.info(kafkaProducerWrapper.getStats()); 
    kafkaProducerWrapper.close(); 
} 

И я использую режим автономного Confluent HDFS Connector в для записи данных в HDFS. Конфигурация выглядит следующим образом:

name=hdfs-consumer-test 
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector 
tasks.max=1 

topics=my-test-topic 

hdfs.url=hdfs://my-cluster/kafka-test 
hadoop.conf.dir=/etc/hadoop/conf/ 
flush.size=100000 
rotate.interval.ms=20000 

# increase timeouts to avoid CommitFailedException 
consumer.session.timeout.ms=300000 
consumer.request.timeout.ms=310000 

heartbeat.interval.ms= 60000 
session.timeout.ms= 100000 

разъем записывает данные в HDFS, но после ожидания в течение 20000 мс (из-за rotate.interval.ms) не все сообщения принимаются.

scala> spark.read.avro("/kafka-test/topics/my-test-topic/partition=*/my-test-topic*") 
    .count() 
res0: Long = 9749015 

Любая идея, в чем причина такого поведения? Где моя ошибка? Я использую Confluent 3.0.1/Kafka 10.0.0.1.

+2

Вы видите, что последние несколько сообщений не перемещаются в HDFS? Если это так, вероятно, вы столкнулись с проблемой, описанной здесь. Https://github.com/confluentinc/kafka-connect-hdfs/pull/100 Попробуйте отправить еще одно сообщение в тему после того, как rotate.interval.ms имеет expired, чтобы проверить, что это то, с чем вы работаете. Если вам нужно повернуть в зависимости от времени, вероятно, это хорошая идея для обновления, чтобы забрать исправление. – dawsaw

+0

Это решение! Я обновился до ** Confluent 3.1.1 **, и я могу видеть все сообщения в HDFS. Вы хотите написать это как ответ, и я даю вам то преимущество, которого вы заслуживаете? –

+1

Да, конечно, не знал, что на самом деле была разница :) – dawsaw

ответ

1

Вы видите, что последние несколько сообщений не перемещаются в HDFS? Если это так, вероятно, вы столкнулись с проблемой, описанной здесь. https://github.com/confluentinc/kafka-connect-hdfs/pull/100

Попробуйте отправить еще одно сообщение в тему после истечения срока действия rotate.interval.ms, чтобы проверить, что именно вы работаете. Если вам нужно повернуть в зависимости от времени, вероятно, это хорошая идея для обновления, чтобы забрать исправление.