10

У меня есть таблица Кассандры, что для простоты выглядит примерно так:Как запросить столбцы данных JSON, используя Spark DataFrames?

key: text 
jsonData: text 
blobData: blob 

Я могу создать базовый фрейм данных для этого с помощью искры и свечи-Кассандры разъема с помощью:

val df = sqlContext.read 
    .format("org.apache.spark.sql.cassandra") 
    .options(Map("table" -> "mytable", "keyspace" -> "ks1")) 
    .load() 

Я хотя и пытается расширить данные JSON в своей базовой структуре. В конечном итоге я хочу иметь возможность фильтровать на основе атрибутов в строке json и возвращать данные blob. Что-то вроде jsonData.foo = "bar" и return blobData. Возможно ли это в настоящее время?

+0

Является ли 'ключ' уникальным идентификатором? – zero323

+0

Да, ключ является основным ключом таблицы – JDesuv

ответ

27

Спарк 2.1+

Вы можете использовать from_json функцию:

import org.apache.spark.sql.functions.from_json 
import org.apache.spark.sql.types._ 

val schema = StructType(Seq(
    StructField("k", StringType, true), StructField("v", DoubleType, true) 
)) 

df.withColumn("jsonData", from_json($"jsonData", schema)) 

Спарк 1.6+

Вы можете использовать get_json_object, который принимает столбец и путь:

import org.apache.spark.sql.functions.get_json_object 

val exprs = Seq("k", "v").map(
    c => get_json_object($"jsonData", s"$$.$c").alias(c)) 

df.select($"*" +: exprs: _*) 

и извлекает поля для отдельных строк, которые могут быть добавлены к ожидаемым типам.

Спарк < = 1,5:

это возможно в настоящее время Является ли?

Насколько я знаю, это невозможно. Вы можете попробовать что-то похожее на это:

val df = sc.parallelize(Seq(
    ("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"), 
    ("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2") 
)).toDF("key", "jsonData", "blobData") 

Я предполагаю, что blob поле не может быть представлена ​​в формате JSON. В противном случае вы кабина опускает расщеплению и присоединение:

import org.apache.spark.sql.Row 

val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey") 
val jsons = sqlContext.read.json(df.drop("blobData").map{ 
    case Row(key: String, json: String) => 
    s"""{"key": "$key", "jsonData": $json}""" 
}) 

val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey") 
parsed.printSchema 

// root 
// |-- jsonData: struct (nullable = true) 
// | |-- k: string (nullable = true) 
// | |-- v: double (nullable = true) 
// |-- key: long (nullable = true) 
// |-- blobData: string (nullable = true) 

Альтернативу (дешевле, хотя и более сложный) подход заключается в использовании UDF для разбора JSON и вывести столбец struct или map. Например, что-то вроде этого:

import net.liftweb.json.parse 

case class KV(k: String, v: Int) 

val parseJson = udf((s: String) => { 
    implicit val formats = net.liftweb.json.DefaultFormats 
    parse(s).extract[KV] 
}) 

val parsed = df.withColumn("parsedJSON", parseJson($"jsonData")) 
parsed.show 

// +---+--------------------+------------------+----------+ 
// |key|   jsonData|   blobData|parsedJSON| 
// +---+--------------------+------------------+----------+ 
// | 1|{"k": "foo", "v":...|some_other_field_1| [foo,1]| 
// | 2|{"k": "bar", "v":...|some_other_field_2| [bar,3]| 
// +---+--------------------+------------------+----------+ 

parsed.printSchema 

// root 
// |-- key: string (nullable = true) 
// |-- jsonData: string (nullable = true) 
// |-- blobData: string (nullable = true) 
// |-- parsedJSON: struct (nullable = true) 
// | |-- k: string (nullable = true) 
// | |-- v: integer (nullable = false) 
0

, лежащее в основе JSON струну

"{ \"column_name1\":\"value1\",\"column_name2\":\"value2\",\"column_name3\":\"value3\",\"column_name5\":\"value5\"}"; 

Ниже приведен сценарий для фильтрации JSON и загрузить необходимые данные в Кассандре.

sqlContext.read.json(rdd).select("column_name1 or fields name in Json", "column_name2","column_name2") 
      .write.format("org.apache.spark.sql.cassandra") 
      .options(Map("table" -> "Table_name", "keyspace" -> "Key_Space_name")) 
      .mode(SaveMode.Append) 
      .save() 
1

Функция from_json - это именно то, что вы ищете. Ваш код будет выглядеть примерно так:

val df = sqlContext.read 
    .format("org.apache.spark.sql.cassandra") 
    .options(Map("table" -> "mytable", "keyspace" -> "ks1")) 
    .load() 

//You can define whatever struct type that your json states 
val schema = StructType(Seq(
    StructField("key", StringType, true), 
    StructField("value", DoubleType, true) 
)) 

df.withColumn("jsonData", from_json(col("jsonData"), schema))