2016-11-05 5 views
1

Ниже существующий код, который я использую для чтения создать Dstream [String] из Кафки Спарк потоковый:Initialize createDirectStream с MessageHandler

var messages: DStream[String] = null 
val topicsSet = kafkaTopicsName.split(",").toSet 
messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsSet).map(_._2) 
message.foreachRDD { rdd => 
. . . . . 
} 
streamingContext.start() 
streamingContext.awaitTermination() 

Я хотел, чтобы перейти от:

def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V]](ssc: StreamingContext, 
    kafkaParams: Map[String, String], topics: Set[String])(implicit arg0: ClassTag[K], 
    arg1: ClassTag[V], arg2: ClassTag[KD], arg3: ClassTag[VD]): InputDStream[(K, V)] 

в

def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R] 
    (jssc: JavaStreamingContext, keyClass: Class[K], valueClass: Class[V], 
    keyDecoderClass: Class[KD], valueDecoderClass: Class[VD], recordClass: Class[R], 
    kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long], 
    messageHandler: Function[MessageAndMetadata[K, V], R]): JavaInputDStream[R] 

Так что я могу управлять смещением искрового потребителя самостоятельно. Я понял, как получить доступ к смещенным диапазонам от Zookeeper, а затем назначить диапазоны смещения createDirectStream

Но я не уверен, как передать обработчик сообщений и класс R. Что именно эти два параметра делают? На самом деле appritiate, если я могу получить образец фрагмента кода в scala для указанного выше конструктора.

ответ

0

Вы можете использовать что-то вроде этого.

KafkaUtils.createDirectStream (sparkStreamContext, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, fromOffsets, (функция, String>) MessageAndMetadata :: сообщение);

Это будет возвращать JavaInputDStream, состоящий из ваших сообщений на основе fromOffsets

+0

Код находится в Scala. эквивалентный синтаксис для scala – user1523567

1

Я понял одно решение:

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](
    streamingContext, kafkaParams, topicMap, (mmd: MessageAndMetadata[String, String]) => mmd.message())