2017-01-24 8 views
2

Я импортирую данные из Федерального резервного экономического набора данных API. Каждый запрос возвращает ежедневный, еженедельный, ежемесячный или ежегодный временные ряды. Моя конечная цель - сделать выбор переменных и построить байесовскую модель, которая использует выбранные временные ряды в качестве предикторов для определенного временного ряда. Каков наилучший способ структурирования этих данных в кадре данных?Лучший способ компоновки данных TimeSeries в Spark DataFrame - Scala

В соответствии с этим documentation Я бы подумал, что мои данные должны быть изложены в «Экстантах» format. Однако мои попытки покончить с этим все в конечном итоге являются чрезмерно медленными после попытки присоединиться к тому, что составляет более 200 000 столбцов. Другой формат из приведенной ниже документации - это «TimeSeriesRDD», но импортируемый временной ряд часто не имеет перекрытия дат и варьируется от 1930 до настоящего времени. Итак, каков наилучший способ структурирования этих данных в кадре данных?

Пример того, как загрузить данные из FRED в рекомендованный вами формат, будет очень признателен!

Вот мой первый подход, который непозволительно медленно

for (seriesId <- allSeries) { 
    val series = loadSeriesFromAPI(seriesId, spark) 
    allSeries = allSeries.join(series, allSeries.col("date") === series.col(seriesId + "_date"), "outer") 
    allSeries = allSeries.drop(seriesId + "_date") 
} 

И мой второй, что я должен загрузить в данных 1 столбец и 1 строку в то время

for(row <- series) { 
    val insertStr = "%s, %g". 
    format(
     row.asInstanceOf[Map[String, Date]]("date").asInstanceOf[String], 
     parseDoubleOrZero(row.asInstanceOf[Map[String, Double]]("value").asInstanceOf[String]) 
    ) 
} 

ответ

1

Имея DataFrame с 200 000 столбцов - неплохая идея. Одна вещь, которую я бы рекомендовал, состоит в том, чтобы немного разбить проблему, не смешивая слишком много технологий:

  1. Внесение данных: насколько велики ваши ряды на самом деле? Избегайте объединения в максимально возможной степени (объединение подразумевает перетасовку, перетасовка подразумевает сеть, и это сделает все медленным). Я собирал данные с помощью Scala и сохранял их в памяти, если он подходит, если нет, я все равно собирал партии серий в Scala и конвертировал каждую партию в Spark DataFrame.
  2. Dataframe создание: Если вам удастся получить ваши данные в памяти, то вы можете попробовать следующий фрагмент кода, который будет создавать вам DataFrame:
case class Point(timestamp: Long, value: Long) 
case class Series(id: String, points: List[Point]) 

val s1 = Series("s1", List(Point(1, 100), Point(2, 200), Point(3, 100))) 
val s2 = Series("s2", List(Point(1, 1000), Point(3, 100))) 

val seriesDF = sc.parallelize(Array(s1, s2)).toDF 
seriesDF.show() 

seriesDF.select($"id", explode($"points").as("point")) 
    .select($"id", $"point.timestamp", $"point.value") 
    .show() 

Выход:

+---+--------------------+ 
| id|    points| 
+---+--------------------+ 
| s1|[[1,100], [2,200]...| 
| s2| [[1,1000], [3,100]]| 
+---+--------------------+ 
+---+---------+-----+ 
| id|timestamp|value| 
+---+---------+-----+ 
| s1|  1| 100| 
| s1|  2| 200| 
| s1|  3| 100| 
| s2|  1| 1000| 
| s2|  3| 100| 
+---+---------+-----+ 

Для более причудливый способ обработки временных рядов Я бы порекомендовал следующий проект: https://github.com/twosigma/flint

+0

Благодарим вас за понимание! Мне нравится, куда вы идете, и у вас есть пара вопросов. Во-первых, скажем, я хотел использовать случайный алгоритм на основе леса, чтобы сузить число столбцов. Как я мог это сделать без проглатывания всех данных в кадре данных? Во-вторых, нормально ли хранить большое количество данных в пакетах, похоже, что запросы данных будут намного сложнее? – jskracht