Я использую новую версию Apache Spark 1.4.4 API-интерфейсы Apache Spark для извлечения информации из статуса Twitter. JSON, в основном ориентированный на Entities Object - соответствующий часть на этот вопрос показал ниже:Как извлечь комплексные структуры JSON с использованием Apache Spark 1.4.0. Кадры данных
{
...
...
"entities": {
"hashtags": [],
"trends": [],
"urls": [],
"user_mentions": [
{
"screen_name": "linobocchini",
"name": "Lino Bocchini",
"id": 187356243,
"id_str": "187356243",
"indices": [ 3, 16 ]
},
{
"screen_name": "jeanwyllys_real",
"name": "Jean Wyllys",
"id": 111123176,
"id_str": "111123176",
"indices": [ 79, 95 ]
}
],
"symbols": []
},
...
...
}
Есть несколько примеров о том, как извлекать информацию из типов примитивов, как string
, integer
и т.д., - но я не мог найти что-нибудь о том, как обрабатывать эти виды комплекс структур.
Я попробовал этот код, но он по-прежнему не работает, он вызывает исключение
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
val tweets = sqlContext.read.json("tweets.json")
// this function is just to filter empty entities.user_mentions[] nodes
// some tweets doesn't contains any mentions
import org.apache.spark.sql.functions.udf
val isEmpty = udf((value: List[Any]) => value.isEmpty)
import org.apache.spark.sql._
import sqlContext.implicits._
case class UserMention(id: Long, idStr: String, indices: Array[Long], name: String, screenName: String)
val mentions = tweets.select("entities.user_mentions").
filter(!isEmpty($"user_mentions")).
explode($"user_mentions") {
case Row(arr: Array[Row]) => arr.map { elem =>
UserMention(
elem.getAs[Long]("id"),
elem.getAs[String]("is_str"),
elem.getAs[Array[Long]]("indices"),
elem.getAs[String]("name"),
elem.getAs[String]("screen_name"))
}
}
mentions.first
исключение при попытке вызвать mentions.first
:
scala> mentions.first
15/06/23 22:15:06 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 8)
scala.MatchError: [List([187356243,187356243,List(3, 16),Lino Bocchini,linobocchini], [111123176,111123176,List(79, 95),Jean Wyllys,jeanwyllys_real])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34)
at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:55)
at org.apache.spark.sql.catalyst.expressions.UserDefinedGenerator.eval(generators.scala:81)
Что здесь не так? Я понимаю, что это связано с типами, но пока не могу понять.
В качестве дополнительного контекста, структура отображается автоматически является:
scala> mentions.printSchema
root
|-- user_mentions: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: long (nullable = true)
| | |-- id_str: string (nullable = true)
| | |-- indices: array (nullable = true)
| | | |-- element: long (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- screen_name: string (nullable = true)
Примечание 1: Я знаю, что можно решить эту проблему с помощью HiveQL
, но я хотел бы использовать Data-кадры, когда есть так много импульса вокруг него.
SELECT explode(entities.user_mentions) as mentions
FROM tweets
Примечание 2:UDFval isEmpty = udf((value: List[Any]) => value.isEmpty)
это некрасиво хак, и я что-то пропустил, но был единственный способ, которым я пришел, чтобы избежать NPE
Я думаю, что ваш 'случай Row (обр: Array [Row])' не соответствует ввод. – elmalto
Привет, @elmalto, я попробовал как «List», так и «Array», но в любом случае получаю ту же ошибку. – arjones