2015-01-20 7 views
5

Я получаю предупреждение при использовании RDD в составе, и я не уверен, что это то, что я делаю неправильно. Если я делаю это:Предупреждение при использовании RDD для понимания

val sc = new SparkContext(...) 

val anRDD = sc.parallelize(List(
    ("a", List(1, 2, 3)), 
    ("b", List(4), 
    ("c", List(5, 6)) 
) 

for { 
    (someString, listOfInts) <- anRDD 
    someInt <- listOfInts 
} yield (someString, someInt) 

Тогда я получаю этот выход:

warning: `withFilter' method does not yet exist on org.apache.spark.rdd.RDD[(String, List[Int])], using `filter' method instead 
    (s, li) <- rl 

Но это все еще успешно возвращает FlatMappedRDD [(String, Int)]. Я делаю что-то неправильно? Или можно ли игнорировать это предупреждение?

Update: Я также хотел бы принять в качестве ответа как для постижение преобразует эти операции для отображения/flatMap/фильтр вызовов, так как я не думаю, что не было бы никаких фильтров или withFilter вызовов требуется. Я предположил, что это было бы эквивалентно что-то похожее на это:

anRDD.flatMap(tuple => tuple.map(someInt => (tuple._1, someInt))) 

Но это не относится к какой-либо фильтр или withFilter вызовы, которые, кажется, быть источником предупреждения.

О, я использую Spark 1.2.0, Scala 2.10.4, и все это находится в REPL.

ответ

1

Во-первых, я не эксперт, но сделал некоторые копали, и вот что я нашел:

Я скомпилировал код, используя -print (с JavaDecompiler терпел неудачу по какой-то причине), который будет распечатать программу с удаленными функциями Scala. Там я увидел:

test.this.anRDD().filter({ 
    (new anonymous class anonfun$1(): Function1) 
    }).flatMap({ 
    (new anonymous class anonfun$2(): Function1) 
    }, ClassTag.apply(classOf[scala.Tuple2])); 

Вы заметите filter ... так, я проверил на anonfun$1:

public final boolean apply(Tuple2<String, List<Object>> check$ifrefutable$1) 
    { 
    Tuple2 localTuple2 = check$ifrefutable$1; 
    boolean bool; 
    if (localTuple2 != null) { 
     bool = true; 
    } else { 
     bool = false; 
    } 
    return bool; 
    } 

Так что, если вы положили все это вместе, кажется, что filter происходит в понимании, потому что он отфильтровывает все, что НЕ Tuple2.

И, предпочтительнее использовать withFilter вместо filter (не знаете, почему atm). Вы можете видеть, что декомпиляции обычный список вместо RDD

object test { 
    val regList = List(
    ("a", List(1, 2, 3)), 
    ("b", List(4)), 
    ("c", List(5, 6)) 
) 

val foo = for { 
    (someString, listOfInts) <- regList 
    someInt <- listOfInts 
} yield (someString, someInt) 
} 

Какие декомпилирует на:

test.this.regList().withFilter({ 
    (new anonymous class anonfun$1(): Function1) 
}).flatMap({ 
    (new anonymous class anonfun$2(): Function1) 
}, immutable.this.List.canBuildFrom()).$asInstanceOf[List](); 

Итак, это то же самое, за исключением того, что использует withFilter где он может

+0

Не обязательно. Где происходит вызов withFilter? Я предполагаю (как я всегда делаю с Scala), что это происходит из-за неявного преобразования где-то, но я не понимаю, почему это имеет значение здесь, поскольку для понимания понимает, что он вызывает .filter, а не .withFilter – jayhutfles

+0

Поскольку предпочтение следует использовать withFilter для предупреждения. В этом случае он просто возвращается к методу фильтрации. –

+0

@jayhutfles Я просто добавил больше, показывая предпочтение withFilter –

0

Вызовите collect() в RDD перед отправкой его на понимание.

val collectedList = anRDD.collect 
for { 
    (someString, listOfInts) <- collectedList 
    someInt <- listOfInts 
} yield (someString, someInt)