2017-01-06 4 views
1

Я пытаюсь объединить несколько таблиц MySQL в искровом режиме. Некоторые из этих таблиц имеют повторяющиеся имена столбцов (каждая таблица имеет поле идентификатора, относящееся к этой таблице).Объединение таблиц с именами повторяющихся столбцов в искровом

Если я пытаюсь запустить:

val myDF = session.read.jdbc("t1 inner join t2 on t1.t2_id = t2.id, queryTable, prop) 
myDF.show 

Я получаю java.sql.SQLIntegrityConstraintViolationException: Column 'id' in field list is ambiguous, поскольку обе таблицы имеют поле идентификатора (с разными значениями)

Я пытался делать:

val t1DF = spark.read.jdbc(dbstring, "t1", "id").alias("a") 
val t2DF = spark.read.jdbc(dbstring, "t2", "id").alias("b") 
val joinedDF = t1DF.join(t2DF, Seq("a.t2_id", "b.id")) 
    .selectExpr("ent.id as entity_id", "lnk.pagerank") 

Я получил error org.apache.spark.sql.AnalysisException: using columns ['t1.t2_id,'t2.id] can not be resolved given input columns: [..] Казалось бы, анализатор не знает, как обращаться с псевдонимами.

Единственный вариант, который, кажется, работает как с использованием подзапроса:

spark.read.jdbc(dbstring, "(select t1.id as t1_id, t1.t2_id from 
t1 inner join t2 on t1.t2_id = t2.id) t", "t2_id") 

Хотя в этом случае подзапрос нужно будет завершить работу, прежде чем я могу сделать какие-либо фильтры, что делает вещи не-благоугодно медленно и любые беспорядок запроса.

Возможно, у Spark есть внутренний способ устранения неоднозначности между идентификаторами id#528 и id#570, но я не могу найти способ ссылаться на них в инструкции select.

ответ

0

У меня была та же проблема. Единственный способ, которым я решил решить эту проблему, - добавить суффикс в имена столбцов. Это выглядит примерно так:

val t1DF = spark.read.jdbc(dbstring, "t1", "id").select(col("id").alias("id_t1")) 
val t2DF = spark.read.jdbc(dbstring, "t2", "id").select(col("id").alias("id_t2")) 

val joinedDF = t1DF.join(t2DF, t1DF("id_t1") === t2DF("id_t2"))