Моя команда создает процесс ETL для загрузки сырых текстовых файлов с разделителями в паркетное «озеро данных» с использованием Spark. Одно из обещаний магазина столбцов Parquet заключается в том, что запрос будет читать только необходимые «столбчатые полосы».Почему Apache Spark считывает ненужные столбцы паркета внутри вложенных структур?
Но мы видим, что для вложенных структур схемы читаются неожиданные столбцы.
Чтобы продемонстрировать, здесь РОС с помощью Scala и Спарк 2.0.1 оболочки:
// Preliminary setup
sc.setLogLevel("INFO")
import org.apache.spark.sql.types._
import org.apache.spark.sql._
// Create a schema with nested complex structures
val schema = StructType(Seq(
StructField("F1", IntegerType),
StructField("F2", IntegerType),
StructField("Orig", StructType(Seq(
StructField("F1", StringType),
StructField("F2", StringType))))))
// Create some sample data
val data = spark.createDataFrame(
sc.parallelize(Seq(
Row(1, 2, Row("1", "2")),
Row(3, null, Row("3", "ABC")))),
schema)
// Save it
data.write.mode(SaveMode.Overwrite).parquet("data.parquet")
Затем мы читаем файл обратно в DataFrame и проект подмножества столбцов:
// Read it back into another DataFrame
val df = spark.read.parquet("data.parquet")
// Select & show a subset of the columns
df.select($"F1", $"Orig.F1").show
Когда это работает, мы видим ожидаемый результат:
+---+-------+
| F1|Orig_F1|
+---+-------+
| 1| 1|
| 3| 3|
+---+-------+
... Но план запроса показывает немного диффер т история:
"оптимизированный план" показывает:
val projected = df.select($"F1", $"Orig.F1".as("Orig_F1"))
projected.queryExecution.optimizedPlan
// Project [F1#18, Orig#20.F1 AS Orig_F1#116]
// +- Relation[F1#18,F2#19,Orig#20] parquet
И "объяснить" показывает:
projected.explain
// == Physical Plan ==
// *Project [F1#18, Orig#20.F1 AS Orig_F1#116]
// +- *Scan parquet [F1#18,Orig#20] Format: ParquetFormat, InputPaths: hdfs://sandbox.hortonworks.com:8020/user/stephenp/data.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<F1:int,Orig:struct<F1:string,F2:string>>
И журналы INFO, полученные в ходе выполнения также подтверждают, что столбец Orig.F2 является неожиданно читаем:
16/10/21 15:13:15 INFO parquet.ParquetReadSupport: Going to read the following fields from the Parquet file:
Parquet form:
message spark_schema {
optional int32 F1;
optional group Orig {
optional binary F1 (UTF8);
optional binary F2 (UTF8);
}
}
Catalyst form:
StructType(StructField(F1,IntegerType,true), StructField(Orig,StructType(StructField(F1,StringType,true), StructField(F2,StringType,true)),true))
Согласно Dremel paper и Parquet documentation, столбцы для сложных вложенных структур должны независимо храниться и независимо извлекаться.
Вопросы:
- Является ли это поведение ограничение тока двигателя Спарк запроса? Другими словами, поддерживает ли Parquet оптимальное выполнение этого запроса, но планировщик запросов Spark наивен?
- Или это ограничение текущей реализации Паркета?
- Или я не использую API Spark правильно?
- Или я не понимаю, как должно работать хранилище столбцов Dremel/Parquet?
Возможно, связанный с: Why does the query performance differ with nested columns in Spark SQL?
Это проблема двигателя запросов Spark. –
@LostInOverflow, знаете ли вы, есть ли это в трекере Spark? https://issues.apache.org/jira/browse/SPARK/?selectedTab=com.atlassian.jira.jira-projects-plugin:issues-panel –
Похоже, что Паркет должен поддерживать этот сценарий в соответствии с @ julien-le- dem https://twitter.com/J_/status/789584704169123841 –