Я хочу взять строку SQL в качестве пользовательского ввода, а затем преобразовать ее перед выполнением. В частности, я хочу изменить проекцию верхнего уровня (select clause), введя дополнительные столбцы, которые будут извлекаться по запросу.Преобразование Spark SQL AST с extraOptimizations
Я надеялся достичь этого, подключившись к Catalyst, используя sparkSession.experimental.extraOptimizations
. Я знаю, что то, что я пытаюсь, не является строго оптимизацией (преобразование изменяет семантику оператора SQL), но API по-прежнему кажется подходящим. Однако мое преобразование, по-видимому, игнорируется исполнителем запросов.
Вот минимальный пример, иллюстрирующий проблему, которую я испытываю. Сначала определим ряд случай класс:
case class TestRow(a: Int, b: Int, c: Int)
Затем определить правила оптимизации, которая просто отбрасывает любую проекцию:
object RemoveProjectOptimisationRule extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
case x: Project => x.child
}
}
Теперь создать набор данных, зарегистрировать оптимизацию и запустить SQL запрос:
// Create a dataset and register table.
val dataset = List(TestRow(1, 2, 3)).toDS()
val tableName: String = "testtable"
dataset.createOrReplaceTempView(tableName)
// Register "optimisation".
sparkSession.experimental.extraOptimizations =
Seq(RemoveProjectOptimisationRule)
// Run query.
val projected = sqlContext.sql("SELECT a FROM " + tableName + " WHERE a = 1")
// Print query result and the queryExecution object.
println("Query result:")
projected.collect.foreach(println)
println(projected.queryExecution)
Вот результат:
Query result:
[1]
== Parsed Logical Plan ==
'Project ['a]
+- 'Filter ('a = 1)
+- 'UnresolvedRelation `testtable`
== Analyzed Logical Plan ==
a: int
Project [a#3]
+- Filter (a#3 = 1)
+- SubqueryAlias testtable
+- LocalRelation [a#3, b#4, c#5]
== Optimized Logical Plan ==
Filter (a#3 = 1)
+- LocalRelation [a#3, b#4, c#5]
== Physical Plan ==
*Filter (a#3 = 1)
+- LocalTableScan [a#3, b#4, c#5]
Мы видим, что результат идентичен результату исходного оператора SQL без применения преобразования. Тем не менее, при печати логических и физических планов проекция действительно удалена. Я также подтвердил (через вывод журнала отладки), что преобразование действительно вызывается.
Любые предложения относительно того, что здесь происходит? Может быть, оптимизатор просто игнорирует «оптимизацию», изменяющую семантику?
Если использование оптимизаций - не выход, может ли кто-нибудь предложить альтернативу? Все, что я действительно хочу сделать, это разобрать входной оператор SQL, преобразовать его и передать преобразованный AST в Spark для выполнения. Но, насколько я понимаю, API для этого являются частными для пакета Spark sql
. Возможно, можно использовать отражение, но я бы хотел этого избежать.
Любые указатели будут очень признательны.
Благодарим за подтверждение этого Михаэля. Вместо этого я пошел на более прямой подход к получению АСТ через частные частные методы - я опишу это в отдельном ответе. –