У меня есть DataFrame
, к которому я должен применить серию фильтрующих запросов. Например, я загружаю DataFrame
следующим образом.Как распараллеливать/распространять запросы/счета против Spark DataFrame?
val df = spark.read.parquet("hdfs://box/some-parquet")
У меня тогда есть куча «произвольных» фильтров следующим образом.
- C0 = 'истинный' и C1 = 'ложь'
- C0 = 'ложь' и С3 = 'истинный'
- и так далее ...
Я обычно получить эти фильтрует динамически с использованием метода утилизации.
val filters: List[String] = getFilters()
Все, что я сделать, это применить эти фильтры к DataFrame
, чтобы получить отсчеты. Например.
val counts = filters.map(filter => {
df.where(filter).count
})
Я заметил, что это НЕ параллельная/распределенная операция при сопоставлении фильтров. Если я привяжу фильтры к RDD/DataFrame, этот подход не будет работать, потому что тогда я выполнял операции с вложенными фреймами данных (которые, как я прочитал на SO, не разрешены в Spark). Что-то вроде следующего дает NullPointerException (NPE).
val df = spark.read.parquet("hdfs://box/some-parquet")
val filterRDD = spark.sparkContext.parallelize(List("C0='false'", "C1='true'"))
val counts = filterRDD.map(df.filter(_).count).collect
Caused by: java.lang.NullPointerException at org.apache.spark.sql.Dataset.filter(Dataset.scala:1127) at $anonfun$1.apply(:27) at $anonfun$1.apply(:27) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at scala.collection.AbstractIterator.to(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:912) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:912) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Есть ли способ распараллеливания/распределить подсчета фильтров на DataFrame
в Спарк? Кстати, я на Spark v2.0.2.
Предполагая, что вы хотите достичь, это один проход над исходными данными (в противном случае, не может быть никакого выигрыша ожидать от этого), я d перерабатывать функции фильтра в UDF, которые возвращают 1 (сопоставление фильтра) или 0 (нет соответствия фильтра), добавляют 1 столбец UDF к фреймворку данных и делают groupBy/count для добавленных столбцов, что приведет к созданию 1 строки данных , держа все подсчеты. – GPI
Вы могли бы показать пример? –