Спарк не поддерживает это, по умолчанию. Фильтрация на одних и тех же данных дважды не так уж плоха, если вы предварительно ее кешируете, а сама фильтрация выполняется быстро.
Если это действительно только два различных типа, вы можете использовать вспомогательный метод:
implicit class RDDOps[T](rdd: RDD[T]) {
def partitionBy(f: T => Boolean): (RDD[T], RDD[T]) = {
val passes = rdd.filter(f)
val fails = rdd.filter(e => !f(e)) // Spark doesn't have filterNot
(passes, fails)
}
}
val (matches, matchesNot) = sc.parallelize(1 to 100).cache().partitionBy(_ % 2 == 0)
Но как только у вас есть несколько типов данных, просто назначить отфильтрованный на новый вал.
работает ли этот подход с API-интерфейсом Spark Java? –
Нет, Java не имеет методов расширения. –
Не следует использовать 'rdd.cache()' перед запуском фильтров? Это, безусловно, должно увеличить скорость вашего второго фильтра. –