2015-06-24 1 views
6

Я использую новую версию Apache Spark 1.4.4 API-интерфейсы Apache Spark для извлечения информации из статуса Twitter. JSON, в основном ориентированный на Entities Object - соответствующий часть на этот вопрос показал ниже:Как извлечь комплексные структуры JSON с использованием Apache Spark 1.4.0. Кадры данных

{ 
    ... 
    ... 
    "entities": { 
    "hashtags": [], 
    "trends": [], 
    "urls": [], 
    "user_mentions": [ 
     { 
     "screen_name": "linobocchini", 
     "name": "Lino Bocchini", 
     "id": 187356243, 
     "id_str": "187356243", 
     "indices": [ 3, 16 ] 
     }, 
     { 
     "screen_name": "jeanwyllys_real", 
     "name": "Jean Wyllys", 
     "id": 111123176, 
     "id_str": "111123176", 
     "indices": [ 79, 95 ] 
     } 
    ], 
    "symbols": [] 
    }, 
    ... 
    ... 
} 

Есть несколько примеров о том, как извлекать информацию из типов примитивов, как string, integer и т.д., - но я не мог найти что-нибудь о том, как обрабатывать эти виды комплекс структур.

Я попробовал этот код, но он по-прежнему не работает, он вызывает исключение

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) 

val tweets = sqlContext.read.json("tweets.json") 

// this function is just to filter empty entities.user_mentions[] nodes 
// some tweets doesn't contains any mentions 
import org.apache.spark.sql.functions.udf 
val isEmpty = udf((value: List[Any]) => value.isEmpty) 

import org.apache.spark.sql._ 
import sqlContext.implicits._ 
case class UserMention(id: Long, idStr: String, indices: Array[Long], name: String, screenName: String) 

val mentions = tweets.select("entities.user_mentions"). 
    filter(!isEmpty($"user_mentions")). 
    explode($"user_mentions") { 
    case Row(arr: Array[Row]) => arr.map { elem => 
    UserMention(
     elem.getAs[Long]("id"), 
     elem.getAs[String]("is_str"), 
     elem.getAs[Array[Long]]("indices"), 
     elem.getAs[String]("name"), 
     elem.getAs[String]("screen_name")) 
    } 
} 

mentions.first 

исключение при попытке вызвать mentions.first:

scala>  mentions.first 
15/06/23 22:15:06 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 8) 
scala.MatchError: [List([187356243,187356243,List(3, 16),Lino Bocchini,linobocchini], [111123176,111123176,List(79, 95),Jean Wyllys,jeanwyllys_real])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) 
    at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34) 
    at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34) 
    at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:55) 
    at org.apache.spark.sql.catalyst.expressions.UserDefinedGenerator.eval(generators.scala:81) 

Что здесь не так? Я понимаю, что это связано с типами, но пока не могу понять.

В качестве дополнительного контекста, структура отображается автоматически является:

scala> mentions.printSchema 
root 
|-- user_mentions: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- id: long (nullable = true) 
| | |-- id_str: string (nullable = true) 
| | |-- indices: array (nullable = true) 
| | | |-- element: long (containsNull = true) 
| | |-- name: string (nullable = true) 
| | |-- screen_name: string (nullable = true) 

Примечание 1: Я знаю, что можно решить эту проблему с помощью HiveQL, но я хотел бы использовать Data-кадры, когда есть так много импульса вокруг него.

SELECT explode(entities.user_mentions) as mentions 
FROM tweets 

Примечание 2:UDFval isEmpty = udf((value: List[Any]) => value.isEmpty) это некрасиво хак, и я что-то пропустил, но был единственный способ, которым я пришел, чтобы избежать NPE

+0

Я думаю, что ваш 'случай Row (обр: Array [Row])' не соответствует ввод. – elmalto

+0

Привет, @elmalto, я попробовал как «List», так и «Array», но в любом случае получаю ту же ошибку. – arjones

ответ

4

здесь является решением, работает, только с одним маленьким взломом.

Основная идея заключается в том, чтобы обойти проблему типа, объявляя список [String], а не список [Роу]:

val mentions = tweets.explode("entities.user_mentions", "mention"){m: List[String] => m} 

Это создает второй столбец под названием «упомянуть» типа «Struct»:

|   entities|    mention| 
+--------------------+--------------------+ 
|[List(),List(),Li...|[187356243,187356...| 
|[List(),List(),Li...|[111123176,111123...| 

Теперь сделайте карту(), чтобы извлечь поля внутри упоминания. GetStruct (1) вызов получает значение в столбце 1 каждой строки:

case class Mention(id: Long, id_str: String, indices: Seq[Int], name: String, screen_name: String) 
val mentionsRdd = mentions.map(
    row => 
    { 
     val mention = row.getStruct(1) 
     Mention(mention.getLong(0), mention.getString(1), mention.getSeq[Int](2), mention.getString(3), mention.getString(4)) 
    } 
) 

И превратить RDD обратно в DataFrame:

val mentionsDf = mentionsRdd.toDF() 

Там вы идете!

|  id| id_str|  indices|   name| screen_name| 
+---------+---------+------------+-------------+---------------+ 
|187356243|187356243| List(3, 16)|Lino Bocchini| linobocchini| 
|111123176|111123176|List(79, 95)| Jean Wyllys|jeanwyllys_real| 
+0

Спасибо Xinh Huynh, моя забота об этом хаке, заключается в том, что я просматриваю весь набор данных, делающий 'Row.toString()' перед извлечением элементов, у меня нет конкретных тестов, но похоже, что мы будем тратить много времени машинного времени, чтобы сделать этот шаг. Это единственная причина, по которой я не считаю ваш вопрос правильным! – arjones

-1

Попробуйте сделать это:

case Row(arr: Seq[Row]) => arr.map { elem => 
+1

Пожалуйста, добавьте несколько комментариев о своем решении о том, почему и как оно решает проблему. –

 Смежные вопросы

  • Нет связанных вопросов^_^