Ниже существующий код, который я использую для чтения создать Dstream [String] из Кафки Спарк потоковый:Initialize createDirectStream с MessageHandler
var messages: DStream[String] = null
val topicsSet = kafkaTopicsName.split(",").toSet
messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsSet).map(_._2)
message.foreachRDD { rdd =>
. . . . .
}
streamingContext.start()
streamingContext.awaitTermination()
Я хотел, чтобы перейти от:
def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V]](ssc: StreamingContext,
kafkaParams: Map[String, String], topics: Set[String])(implicit arg0: ClassTag[K],
arg1: ClassTag[V], arg2: ClassTag[KD], arg3: ClassTag[VD]): InputDStream[(K, V)]
в
def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R]
(jssc: JavaStreamingContext, keyClass: Class[K], valueClass: Class[V],
keyDecoderClass: Class[KD], valueDecoderClass: Class[VD], recordClass: Class[R],
kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long],
messageHandler: Function[MessageAndMetadata[K, V], R]): JavaInputDStream[R]
Так что я могу управлять смещением искрового потребителя самостоятельно. Я понял, как получить доступ к смещенным диапазонам от Zookeeper, а затем назначить диапазоны смещения createDirectStream
Но я не уверен, как передать обработчик сообщений и класс R. Что именно эти два параметра делают? На самом деле appritiate, если я могу получить образец фрагмента кода в scala для указанного выше конструктора.
Код находится в Scala. эквивалентный синтаксис для scala – user1523567