2017-01-30 12 views
1

Я пытаюсь прочитать и преобразовать файл csv как с столбцами json, так и с не-json. Мне удалось прочитать файл и поместить его в dataframe. Схема такова:Файл чтения pyspark с столбцами json и non-json

root 
|-- 'id': string (nullable = true) 
|-- 'score': string (nullable = true) 

И если я df.take(2), я получаю эти результаты:

[Row('id'=u"'AF03DCAB-EE3F-493A-ACD9-4B98F548E6F3'", 'score'=u"{'topSpeed':15.00000,'averageSpeed':5.00000,'harshBraking':0,'harshAcceleration':0,'driverRating':null,'idlingScore':70,'speedingScore':70,'brakingScore':70,'accelerationScore':70,'totalEcoScore':70 }"), Row('id'=u"'1938A2B9-5EF2-413C-A7A3-C5F324FD4089'", 'score'=u"{'topSpeed':106.00000,'averageSpeed':71.00000,'harshBraking':0,'harshAcceleration':0,'driverRating':9,'idlingScore':76,'speedingScore':87,'brakingScore':86,'accelerationScore':82,'totalEcoScore':83 }")]

колонка id является «нормальным» столбец и score столбец содержит данные в json. Я хочу разбить содержимое json на отдельные столбцы, но также нужен столбец id с остальными данными. А имеет рабочую часть кода только для бигованной колонки:

df = rawdata.select("'score'") 
df1 = df.rdd # Convert to rdd 
df2 = df1.flatMap(lambda x: x) # Flatten rows 
dfJsonScore = sqlContext.read.json(df2) 
dfJsonScore.printSchema() 
dfJsonScore.take(3) 

Это дает мне это:

root 
|-- accelerationScore: long (nullable = true) 
|-- averageSpeed: double (nullable = true) 
|-- brakingScore: long (nullable = true) 
|-- driverRating: long (nullable = true) 
|-- harshAcceleration: long (nullable = true) 
|-- harshBraking: long (nullable = true) 
|-- idlingScore: long (nullable = true) 
|-- speedingScore: long (nullable = true) 
|-- topSpeed: double (nullable = true) 
|-- totalEcoScore: long (nullable = true) 

[Row(accelerationScore=70, averageSpeed=5.0, brakingScore=70, driverRating=None, harshAcceleration=0, harshBraking=0, idlingScore=70, speedingScore=70, topSpeed=15.0, totalEcoScore=70), 
Row(accelerationScore=82, averageSpeed=71.0, brakingScore=86, driverRating=9, harshAcceleration=0, harshBraking=0, idlingScore=76, speedingScore=87, topSpeed=106.0, totalEcoScore=83), 
Row(accelerationScore=81, averageSpeed=74.0, brakingScore=85, driverRating=9, harshAcceleration=0, harshBraking=0, idlingScore=75, speedingScore=87, topSpeed=102.0, totalEcoScore=82)] 

Но я не могу заставить его работать в сочетании с колонкой идентификатора.

ответ

2

Существует from_json функция added in pyspark 2.1, которая может обрабатывать ваш случай.

Имея dataframe со следующей схемой:

>>> df.printSchema() 
root 
|-- id: string (nullable = true) 
|-- score: string (nullable = true) 

первой генерации схемы для поля JSon:

>>> score_schema = spark.read.json(df.rdd.map(lambda row: row.score)).schema 

затем использовать его в from_json:

>>> df.withColumn('score', from_json('score', score_schema)).printSchema() 
root 
|-- id: string (nullable = true) 
|-- score: struct (nullable = true) 
| |-- accelerationScore: long (nullable = true) 
| |-- averageSpeed: double (nullable = true) 
| |-- brakingScore: long (nullable = true) 
| |-- driverRating: long (nullable = true) 
| |-- harshAcceleration: long (nullable = true) 
| |-- harshBraking: long (nullable = true) 
| |-- idlingScore: long (nullable = true) 
| |-- speedingScore: long (nullable = true) 
| |-- topSpeed: double (nullable = true) 
| |-- totalEcoScore: long (nullable = true) 

EDIT

Если вы не можете использовать искры 2.1, get_json_object всегда вариант, но требует поля, чтобы быть действительным JSON, т.е. иметь " как строки разделителей вместо ' см этого примера:

df.withColumn('score', regexp_replace('score', "'", "\"")) \ 
    .select(
     'id', 
     get_json_object('score', '$.accelerationScore').alias('accelerationScore'), 
     get_json_object('score', '$.topSpeed').alias('topSpeed') 
    ).show() 

+--------------------+-----------------+--------+ 
|     id|accelerationScore|topSpeed| 
+--------------------+-----------------+--------+ 
|AF03DCAB-EE3F-493...|    70| 15.0| 
|1938A2B9-5EF2-413...|    82| 106.0| 
+--------------------+-----------------+--------+ 
+0

Спасибо за ваш ответ , К сожалению, мы все еще находимся на pyspark 2.0, поэтому мне придется искать альтернативное решение. – Chantal

+0

См. Обновленный ответ – Mariusz

+0

Спасибо за обновление. Это отлично работает для того случая, над которым я работаю. – Chantal