2

У меня есть DataFrame, к которому я должен применить серию фильтрующих запросов. Например, я загружаю DataFrame следующим образом.Как распараллеливать/распространять запросы/счета против Spark DataFrame?

val df = spark.read.parquet("hdfs://box/some-parquet") 

У меня тогда есть куча «произвольных» фильтров следующим образом.

  • C0 = 'истинный' и C1 = 'ложь'
  • C0 = 'ложь' и С3 = 'истинный'
  • и так далее ...

Я обычно получить эти фильтрует динамически с использованием метода утилизации.

val filters: List[String] = getFilters() 

Все, что я сделать, это применить эти фильтры к DataFrame, чтобы получить отсчеты. Например.

val counts = filters.map(filter => { 
df.where(filter).count 
}) 

Я заметил, что это НЕ параллельная/распределенная операция при сопоставлении фильтров. Если я привяжу фильтры к RDD/DataFrame, этот подход не будет работать, потому что тогда я выполнял операции с вложенными фреймами данных (которые, как я прочитал на SO, не разрешены в Spark). Что-то вроде следующего дает NullPointerException (NPE).

val df = spark.read.parquet("hdfs://box/some-parquet") 
val filterRDD = spark.sparkContext.parallelize(List("C0='false'", "C1='true'")) 
val counts = filterRDD.map(df.filter(_).count).collect 
 
Caused by: java.lang.NullPointerException 
    at org.apache.spark.sql.Dataset.filter(Dataset.scala:1127) 
    at $anonfun$1.apply(:27) 
    at $anonfun$1.apply(:27) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1336) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:912) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:912) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Есть ли способ распараллеливания/распределить подсчета фильтров на DataFrame в Спарк? Кстати, я на Spark v2.0.2.

+1

Предполагая, что вы хотите достичь, это один проход над исходными данными (в противном случае, не может быть никакого выигрыша ожидать от этого), я d перерабатывать функции фильтра в UDF, которые возвращают 1 (сопоставление фильтра) или 0 (нет соответствия фильтра), добавляют 1 столбец UDF к фреймворку данных и делают groupBy/count для добавленных столбцов, что приведет к созданию 1 строки данных , держа все подсчеты. – GPI

+0

Вы могли бы показать пример? –

ответ

1

Таким образом, единственным ожидаемым коэффициентом усиления (который может быть очень существенным) будет передавать только один раз на входные данные.

Я хотел бы сделать это как так (программного решения, но эквивалент SQL можно):

  1. Преобразование фильтров UDF, которые возвращают 1 или 0
  2. Добавьте один столбец для каждого из этих UDFS
  3. Группа По сумме ваших данных.

Образца искра сессия выглядит следующим образом:

scala> val data = spark.createDataFrame(Seq("A", "BB", "CCC").map(Tuple1.apply)).withColumnRenamed("_1", "input") 

data: org.apache.spark.sql.DataFrame = [input: string] 

scala> data.show 
+-----+ 
|input| 
+-----+ 
| A| 
| BB| 
| CCC| 
+-----+ 

scala> val containsBFilter = udf((input: String) => if(input.contains("B")) 1 else 0) 
containsBFilter: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(StringType))) 

scala> val lengthFilter = udf((input: String) => if (input.length < 3) 1 else 0) 
lengthFilter: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(StringType))) 

scala> data.withColumn("inputLength", lengthFilter($"input")).withColumn("containsB", containsBFilter($"input")).select(sum($"inputLength"), sum($"containsB")).show 

+----------------+--------------+ 
|sum(inputLength)|sum(containsB)| 
+----------------+--------------+ 
|    2|    1| 
+----------------+--------------+ 
+0

это очень умно. Мне это нравится. вы думаете, что использование параллельного класса Scala может помочь распараллелить? с вашим подходом, как вы заявили, это один проход по данным, но UDF будут линейно расти с количеством фильтров. но я думаю, это нормально, если нет хита производительности. –

+0

I * guess * Spark достаточно умен, чтобы обрабатывать только лишние столбцы (например, вычислять их, передавая данные только один раз), потому что они не имеют зависимости. Это занимает больше места, но мы говорим о длинном, это очень эффективно.Использование параллельных коллекций поражает цель Spark: Spark-разделы являются единицей параллелизма (1 исполнитель core = 1 partition), не подключайте к себе (вы только сражаетесь за процессоры с разделами Spark), просто разделяйте правильный путь. – GPI

+0

Я занимаюсь таким подходом на несколько дней. Я не думаю, что это сработает для моей ситуации. Во-первых, о параллельной коллекции этот подход будет распараллеливаться только в моей программе драйверов, а не в кластере, что я не намерен. Во-вторых, я должен динамически создавать эти UDF (я задал еще один вопрос SO). Но даже если я решила проблему создания UDF динамически, операция 'df = df.withColumn (" someFilter ", someUdf (...))' является чрезмерно длинной. Я могу иметь любое количество динамических UDF (фильтров), и это добавит много столбцов. –

 Смежные вопросы

  • Нет связанных вопросов^_^