2016-01-26 2 views
2

Я пытаюсь выполнить простую задачу в искровом информационном кадре (python), который создает новый dataframe, выбирая конкретные столбцы и вложенные столбцы из другого фрейма данных , например :py4j.protocol.Py4JJavaError при выборе вложенного столбца в dataframe с использованием select statetment

df.printSchema() 
root 
|-- time_stamp: long (nullable = true) 
|-- country: struct (nullable = true) 
| |-- code: string (nullable = true) 
| |-- id: long (nullable = true) 
| |-- time_zone: string (nullable = true) 
|-- event_name: string (nullable = true) 
|-- order: struct (nullable = true) 
| |-- created_at: string (nullable = true) 
| |-- creation_type: struct (nullable = true) 
| | |-- id: long (nullable = true) 
| | |-- name: string (nullable = true) 
| |-- destination: struct (nullable = true) 
| | |-- state: string (nullable = true) 
| |-- ordering_user: struct (nullable = true) 
| | |-- cancellation_score: long (nullable = true) 
| | |-- id: long (nullable = true) 
| | |-- is_test: boolean (nullable = true) 

df2=df.sqlContext.sql("""select a.country_code as country_code, 
a.order_destination_state as order_destination_state, 
a.order_ordering_user_id as order_ordering_user_id, 
a.order_ordering_user_is_test as order_ordering_user_is_test, 
a.time_stamp as time_stamp 
from 
(select 
flat_order_creation.order.destination.state as order_destination_state, 
flat_order_creation.order.ordering_user.id as order_ordering_user_id, 
flat_order_creation.order.ordering_user.is_test as order_ordering_user_is_test, 
flat_order_creation.time_stamp as time_stamp 
from flat_order_creation) a""") 

и я получаю следующее сообщение об ошибке:

Traceback (most recent call last): 
    File "/home/hadoop/scripts/orders_all.py", line 180, in <module> 
    df2=sqlContext.sql(q) 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/context.py", line 552, in sql 
    File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 36, in deco 
    File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling o60.sql. 
: java.lang.RuntimeException: [6.21] failure: ``*'' expected but `order' found 

flat_order_creation.order.destination.state as order_destination_state, 

Я использую искровой подавать с мастером в локальном режиме, чтобы запустить этот код. Важно отметить, что когда я подключаюсь к оболочке pyspark и запускаю код (строка за строкой), он работает, но при отправке его (даже в локальном режиме) он терпит неудачу. Важно отметить, что при выборе не вложенного поля это работает. Я использую искру 1.5.2 на ОМ (версию 4.2.0)

ответ

4

Без a Minimal, Complete, and Verifiable example я могу только догадываться, но это выглядит, как вы используете различные SparkContext реализации в интерактивной оболочке и вашей отдельной программе.

До тех пор, пока двоичные файлы Spark были построены с поддержкой Hive sqlContext, предусмотренными в оболочке, являются HiveContext. Среди других различий он предоставляет более сложный синтаксический анализатор SQL, чем простой SQLContext. Вы можете легко воспроизвести вашу проблему следующим образом:

import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql.hive.HiveContext 

val conf: SparkConf = ??? 
val sc: SparkContext = ??? 
val query = "SELECT df.foobar.order FROM df" 

val hiveContext: SQLContext = new HiveContext(sc) 
val sqlContext: SQLContext = new SQLContext(sc) 
val json = sc.parallelize(Seq("""{"foobar": {"order": 1}}""")) 

sqlContext.read.json(json).registerTempTable("df") 
sqlContext.sql(query).show 
// java.lang.RuntimeException: [1.18] failure: ``*'' expected but `order' found 
// ... 

hiveContext.read.json(json).registerTempTable("df") 
hiveContext.sql(query) 
// org.apache.spark.sql.DataFrame = [order: bigint] 

Низкоур.инициализ sqlContext с HiveContext в отдельной программе следует сделать трюк:

from pyspark.sql import HiveContext 

sqlContext = HiveContext(sc) 

df = sqlContext.createDataFrame(...) 
df.registerTempTable("flat_order_creation") 

sqlContext.sql(...) 

Важно отметить, что проблема не гнездятся себя, но с использованием ORDER ключевое слово в качестве имени столбца. Поэтому, если использование HiveContext не является вариантом, просто измените имя поля на что-то еще.

+0

работ !! благодаря! –