Я хочу потреблять сообщения из темы Кафки с помощью Scala 2.10.6
и Spark 1.6.2
. Для Кафки Я использую эту зависимость:Невозможно скомпилировать потребителя Kafka в Scala
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.2</version>
</dependency>
Этот код компилируется нормально, но я хочу, чтобы определить auto.offset.reset
и здесь возникает проблема:
val topicMap = topic.split(",").map((_, kafkaNumThreads.toInt)).toMap
val data = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap,
StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2)
Когда я добавляю kafkaParams
, он не компилируется больше:
val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> group,
"zookeeper.connection.timeout.ms" -> "10000",
"auto.offset.reset" -> "smallest")
val data = KafkaUtils.createStream(ssc, kafkaParams, topicMap,
StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2)
сообщение об ошибке:
94: error: missing parameter type for expanded function ((x$3) => x$3._2)
[ERROR] StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2)
Я пробовал много разных настроек параметров createStream
, но все не удается. Может кто-нибудь помочь, пожалуйста?
Хорошо, позвольте мне проверить. В самом деле проблема в том, что я хочу потреблять сообщения из удаленной очереди Kafka. Я могу заставить их использовать команду 'curl' и confluent API от терминала. Но я не получаю их, если я запустил код Scala. Итак, мое предположение заключается в том, что мне нужно указать смещение. – Dinosaurius
@Dinosaurius Ошибка в вашем вопросе не имеет ничего общего с смещениями. Это просто компилятор, который не смог вывести правильный тип. –
Да, я знаю. Я просто хотел объяснить, почему мне нужно установить смещение. – Dinosaurius