2014-12-24 1 views
1

Я занимаюсь трехсторонним объединением с тремя SchemaRDD (каждый из порядка миллиона записей, хранящихся в файлах Parquet на HDFS).Присоединение 3 SchemaRDD

Схема выглядит следующим образом:

  • table1 имеет четыре поля: идентификатор, GROUP_ID, t2_id и дату
  • table2 имеет три поля: ID, GROUP_ID и t3_id
  • Таблица3 имеет три поля: id, group_id и date

Я пытаюсь выяснить отношения между таблицей 1 и таблицей3 внутри группы.

SQL-запросов я бы использовал бы:

SELECT group_id, t1.id, t3.id 
    FROM table1, table2, table3 
    WHERE t1.group_id = t2.group_id and t1.t2_id = t2.id and 
    and t2.group_id = t3.group_id and t2.t3_id = t3.id and 
    t3.date < t1.date 

Однако я пытаюсь сделать это в Spark:

val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
import sqlContext._ 
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} 


val tab1 = sqlContext.parquetFile("warehouse/tab1_pq") 
val tab2 = sqlContext.parquetFile("warehouse/tab2_pq") 
val tab3 = sqlContext.parquetFile("warehouse/tab3_pq") 

val relationship = tab1.as('t1). 
    join(tab2.as('t2), Inner, Some(("t2.group_id".attr === "t1.group_id".attr) && ("t2.id".attr === "t1.t2_id".attr))). 
    join(tab3.as('t3), Inner, Some(("t3.group_id".attr === "t2.group_id".attr) && ("t3.id".attr === "t2.t3_id".attr))). 
    where("t3.date".attr <= "t1.date".attr). 
    select("t1.group_id".attr, "t1.id".attr, "t3.id".attr) 

Так что это, кажется, работает - однако он работает значительно медленнее, чем импала в том же (3 единицы, EMR) кластере. Правильно ли это? Есть ли способ сделать это более результативным?

Спасибо за любую помощь

ответ

1

Я думаю, вы правы в соответствии с результатами тестов, приведенных here из Cloudera, кажется, импала хорош для такого рода запросов, но я думаю, что если вы выполняете итеративные запросы на той же таблицы, чем это будет будьте быстрее в sparksql, затем impala.