2016-10-25 9 views
1

Я хочу взять строку SQL в качестве пользовательского ввода, а затем преобразовать ее перед выполнением. В частности, я хочу изменить проекцию верхнего уровня (select clause), введя дополнительные столбцы, которые будут извлекаться по запросу.Преобразование Spark SQL AST с extraOptimizations

Я надеялся достичь этого, подключившись к Catalyst, используя sparkSession.experimental.extraOptimizations. Я знаю, что то, что я пытаюсь, не является строго оптимизацией (преобразование изменяет семантику оператора SQL), но API по-прежнему кажется подходящим. Однако мое преобразование, по-видимому, игнорируется исполнителем запросов.

Вот минимальный пример, иллюстрирующий проблему, которую я испытываю. Сначала определим ряд случай класс:

case class TestRow(a: Int, b: Int, c: Int) 

Затем определить правила оптимизации, которая просто отбрасывает любую проекцию:

object RemoveProjectOptimisationRule extends Rule[LogicalPlan] { 
    def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { 
     case x: Project => x.child 
    } 
} 

Теперь создать набор данных, зарегистрировать оптимизацию и запустить SQL запрос:

// Create a dataset and register table. 
val dataset = List(TestRow(1, 2, 3)).toDS() 
val tableName: String = "testtable" 
dataset.createOrReplaceTempView(tableName) 

// Register "optimisation". 
sparkSession.experimental.extraOptimizations = 
    Seq(RemoveProjectOptimisationRule) 

// Run query. 
val projected = sqlContext.sql("SELECT a FROM " + tableName + " WHERE a = 1") 

// Print query result and the queryExecution object. 
println("Query result:") 
projected.collect.foreach(println) 
println(projected.queryExecution) 

Вот результат:

Query result: 
[1] 

== Parsed Logical Plan == 
'Project ['a] 
+- 'Filter ('a = 1) 
    +- 'UnresolvedRelation `testtable` 

== Analyzed Logical Plan == 
a: int 
Project [a#3] 
+- Filter (a#3 = 1) 
    +- SubqueryAlias testtable 
     +- LocalRelation [a#3, b#4, c#5] 

== Optimized Logical Plan == 
Filter (a#3 = 1) 
+- LocalRelation [a#3, b#4, c#5] 

== Physical Plan == 
*Filter (a#3 = 1) 
+- LocalTableScan [a#3, b#4, c#5] 

Мы видим, что результат идентичен результату исходного оператора SQL без применения преобразования. Тем не менее, при печати логических и физических планов проекция действительно удалена. Я также подтвердил (через вывод журнала отладки), что преобразование действительно вызывается.

Любые предложения относительно того, что здесь происходит? Может быть, оптимизатор просто игнорирует «оптимизацию», изменяющую семантику?

Если использование оптимизаций - не выход, может ли кто-нибудь предложить альтернативу? Все, что я действительно хочу сделать, это разобрать входной оператор SQL, преобразовать его и передать преобразованный AST в Spark для выполнения. Но, насколько я понимаю, API для этого являются частными для пакета Spark sql. Возможно, можно использовать отражение, но я бы хотел этого избежать.

Любые указатели будут очень признательны.

ответ

3

Как вы уже догадались, это не работает, потому что мы делаем предположения, что оптимизатор не изменит результаты запроса.

В частности, мы кэшируем схему, которая выходит из анализатора (и предположим, что оптимизатор не меняет ее). При переводе строк во внешний формат мы используем эту схему и, таким образом, обрезаем столбцы в результате. Если вы сделали больше, чем обрезать (т. Е. Изменили типы данных), это может даже привести к сбою.

Как вы можете видеть в this notebook, это фактически дает результат, который вы ожидаете под обложками. В ближайшем будущем мы планируем открыть больше крючков, которые позволят вам изменить план на других этапах выполнения запроса. См. SPARK-18127 для получения более подробной информации.

+0

Благодарим за подтверждение этого Михаэля. Вместо этого я пошел на более прямой подход к получению АСТ через частные частные методы - я опишу это в отдельном ответе. –

1

Ответ Майкла Армбруста подтвердил, что такое преобразование не должно осуществляться посредством оптимизаций.

Я вместо этого использовал внутренние API в Spark, чтобы добиться преобразования, которое я хотел сейчас. Для этого требуются методы, которые являются частями пакета в Spark. Таким образом, мы можем получить к ним доступ без отражения, поставив соответствующую логику в соответствующий пакет.Наброски:

// Must be in the spark.sql package. 
package org.apache.spark.sql 

object SQLTransformer { 
    def apply(sparkSession: SparkSession, ...) = { 

     // Get the AST. 
     val ast = sparkSession.sessionState.sqlParser.parsePlan(sql) 

     // Transform the AST. 
     val transformedAST = ast match { 
      case node: Project => // Modify any top-level projection 
      ... 
     } 

     // Create a dataset directly from the AST. 
     Dataset.ofRows(sparkSession, transformedAST) 
    } 
} 

Обратите внимание, что это, конечно, может сломаться с будущими версиями Spark.

+0

Будет замечательно, если вы дадите более подробную информацию о реализации. Какой-то пример конца приветствия, это нормально. –