2016-12-28 6 views
1

Я хочу потреблять сообщения из темы Кафки с помощью Scala 2.10.6 и Spark 1.6.2. Для Кафки Я использую эту зависимость:Невозможно скомпилировать потребителя Kafka в Scala

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-streaming_2.10</artifactId> 
    <version>1.6.2</version> 
</dependency> 

Этот код компилируется нормально, но я хочу, чтобы определить auto.offset.reset и здесь возникает проблема:

val topicMap = topic.split(",").map((_, kafkaNumThreads.toInt)).toMap 
val data = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, 
          StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2) 

Когда я добавляю kafkaParams, он не компилируется больше:

val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> group, 
"zookeeper.connection.timeout.ms" -> "10000", 
"auto.offset.reset" -> "smallest") 

val data = KafkaUtils.createStream(ssc, kafkaParams, topicMap, 
           StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2) 

сообщение об ошибке:

94: error: missing parameter type for expanded function ((x$3) => x$3._2) 
[ERROR]             StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2) 

Я пробовал много разных настроек параметров createStream, но все не удается. Может кто-нибудь помочь, пожалуйста?

ответ

1

Чтобы определить основные типы потоков, необходимо добавить параметры типа KafkaUtils.createStream. Например, если ваш ключ и значение имеют тип String:

val data: DStream[String] = 
    KafkaUtils 
    .createStream[String, String, StringDecoder, StringDecoder](
     ssc, 
     kafkaParams, 
     topicMap, 
     StorageLevel.MEMORY_AND_DISK_SER_2 
).map(_._2) 
+0

Хорошо, позвольте мне проверить. В самом деле проблема в том, что я хочу потреблять сообщения из удаленной очереди Kafka. Я могу заставить их использовать команду 'curl' и confluent API от терминала. Но я не получаю их, если я запустил код Scala. Итак, мое предположение заключается в том, что мне нужно указать смещение. – Dinosaurius

+0

@Dinosaurius Ошибка в вашем вопросе не имеет ничего общего с смещениями. Это просто компилятор, который не смог вывести правильный тип. –

+0

Да, я знаю. Я просто хотел объяснить, почему мне нужно установить смещение. – Dinosaurius