2017-02-17 8 views
1

В искры я хочу, чтобы иметь возможность параллелизировать несколько кадров данных.Можете ли вы установить фрейм-фильтр Spark в другой Dataframe?

Метод, который я пытаюсь, заключается в том, чтобы вложить данные в родительский фреймворк, но я не уверен в синтаксисе или если это возможно.

Например, у меня есть следующие 2 dataframes: DF1:

+-----------+---------+--------------------+------+ 
|id   |asset_id |    date| text| 
+-----------+---------+--------------------+------+ 
|20160629025|  A1|2016-06-30 11:41:...|aaa...| 
|20160423007|  A1|2016-04-23 19:40:...|bbb...| 
|20160312012|  A2|2016-03-12 19:41:...|ccc...| 
|20160617006|  A2|2016-06-17 10:36:...|ddd...| 
|20160624001|  A2|2016-06-24 04:39:...|eee...| 

df2:

+--------+--------------------+--------------+ 
|asset_id|  best_date_time| Other_fields| 
+--------+--------------------+--------------+ 
|  A1|2016-09-28 11:33:...|   abc| 
|  A1|2016-06-24 00:00:...|   edf| 
|  A1|2016-08-12 00:00:...|   hij| 
|  A2|2016-07-01 00:00:...|   klm| 
|  A2|2016-07-10 00:00:...|   nop| 

Так я хочу, чтобы объединить их, чтобы создать нечто подобное.

+--------+--------------------+-------------------+ 
|asset_id|     df1|    df2| 
+--------+--------------------+-------------------+ 
|  A1| [df1 - rows for A1]|[df2 - rows for A1]| 
|  A2| [df1 - rows for A2]|[df2 - rows for A2]| 

Обратите внимание, я не хочу, чтобы присоединиться или объединение их в том, что было бы очень скудны (я на самом деле есть около 30 dataframes и тысячи активов каждый с тысячами строк).

Затем я планирую сделать groupByKey на это так, что я получаю что-то вроде этого, что я могу вызвать функцию:

[('A1', <pyspark.resultiterable.ResultIterable object at 0x2534310>), ('A2', <pyspark.resultiterable.ResultIterable object at 0x25d2310>)] 

Я новичок искры так любая помощь очень ценится.

ответ

2

TL; DR Невозможно вложить DataFrames, но вы можете использовать сложные типы.

В этом случае вы могли бы, например (Спарк 2.0 или более поздней версии):

from pyspark.sql.functions import collect_list, struct 

df1_grouped = (df1 
    .groupBy("asset_id") 
    .agg(collect_list(struct("id", "date", "text")))) 

df2_grouped = (df2 
    .groupBy("asset_id") 
    .agg(collect_list(struct("best_date_time", "Other_fields")))) 

df1_grouped.join(df2_grouped, ["asset_id"], "fullouter") 

, но вы должны знать, что:

  • Это довольно дорого.
  • Имеет ограниченные возможности. В общем, вложенные структуры громоздки для использования и требуют сложных и дорогостоящих (особенно в PySpark) UDF.
+0

Спасибо за полезные указатели. – prk

 Смежные вопросы

  • Нет связанных вопросов^_^