2017-01-05 17 views
2

Возможно, DStream есть type parameter s?Тип-параметрирование DStream

Если да, то как?

Когда я пытаюсь lazy val qwe = mStream.mapWithState(stateSpec) на myDStream: DStream[(A, B)] (параметр класса), я получаю:

value mapWithState is not a member of org.apache.spark.streaming.dstream.DStream[(A, B)] 
    lazy val qwe = mStream.mapWithState(stateSpec) 

ответ

2

Существенная подмножество Спарк API требует неявное ClassTags (см Scala: What is a TypeTag and how do I use it?) и PairDStreamFunctions.mapWithState ничем не отличается. Проверьте class definition:

class PairDStreamFunctions[K, V](self: DStream[(K, V)]) 
    (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K]) 

and:

def mapWithState[StateType: ClassTag, MappedType: ClassTag](
    spec: StateSpec[K, V, StateType, MappedType] 
): MapWithStateDStream[K, V, StateType, MappedType] = { 
    ... 
} 

Если хотят создать функцию, которая работает на общих пары потоков и использует mapWithState вы должны по крайней мере обеспечить ClassTags для KeyType и ValueType типов:

def foo[T : ClassTag, U : ClassTag](
    stream: DStream[(T, U)], f: StateSpec[T, U, Int, Int]) = stream.mapWithState(f) 

Если StateType и MappedType, вам также понадобится ClassTags:

def bar[T : ClassTag, U : ClassTag, V : ClassTag, W : ClassTag](
    stream: DStream[(T, U)], f: StateSpec[T, U, V, W]) = stream.mapWithState(f)