2017-01-25 1 views
0

Я новичок в scala. Мне нужна немедленная помощь.Как перевести строки и сравнить значение столбца строки со значением следующего столбца строки в Scala?

У меня есть M * N искра sql dataframe что-то вроде ниже. Мне нужно сравнить значения столбцов строки со значением следующего столбца строки.

Некоторые вещи, например, от A1 до A2, от A1 до A3, и так далее до N. B1 - B2 B1 - B3.

Можете ли вы кого-нибудь, пожалуйста, направить меня, как можно сравнить ряд мудрости в искровом sql?

ID COLUMN1 Column2 
1 A1 B1 
2 A2 B2 
3 A3 B3 

Спасибо заранее Santhosh

+0

Я не понимаю, если у вас есть для сравнения значений в разных столбцах друг с другом на основе каждого ряда или, если вы хотите сравнить две последующие строки друг с другом. – stefanobaghino

+0

Что вы имеете в виду при сравнении? вы хотите добавить булевский столбец, является ли текущий тот же, что и предыдущий? И что вы заказываете? id? они последовательны? вам нужно пройти все (т. е. есть ли еще какой-то другой столбец, который вы группируете первым?) –

+0

добавьте, по крайней мере, ожидаемый результат и даже лучше, если вы объясните пример использования и добавите код, который вы пробовали до сих пор , – maasg

ответ

0

Если я правильно понял вопрос правильно - вы хотите сравнить (с помощью функции) каждое значение к значению того же столбца в предыдущей записи. Вы можете сделать это с помощью lagокна Функции:

import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.Column 
import org.apache.spark.sql.functions._ 
import spark.implicits._ 

// some data... 
val df = Seq(
    (1, "A1", "B1"), 
    (2, "A2", "B2"), 
    (3, "A3", "B3") 
).toDF("ID","COL1", "COL2") 

// some made-up comparisons - fill in whatever you want... 
def compareCol1(curr: Column, prev: Column): Column = curr > prev 
def compareCol2(curr: Column, prev: Column): Column = concat(curr, prev) 

// creating window - ordered by ID 
val window = Window.orderBy("ID") 

// using the window with lag function to compare to previous value in each column 
df.withColumn("COL1-comparison", compareCol1($"COL1", lag("COL1", 1).over(window))) 
    .withColumn("COL2-comparison", compareCol2($"COL2", lag("COL2", 1).over(window))) 
    .show() 

// +---+----+----+---------------+---------------+ 
// | ID|COL1|COL2|COL1-comparison|COL2-comparison| 
// +---+----+----+---------------+---------------+ 
// | 1| A1| B1|   null|   null| 
// | 2| A2| B2|   true|   B2B1| 
// | 3| A3| B3|   true|   B3B2| 
// +---+----+----+---------------+---------------+ 
+0

Благодарим за быстрый ответ: получение ошибки Исключение в потоке «main» org.apache.spark.sql.AnalysisException: не удалось разрешить функцию окна «lag». Обратите внимание, что при использовании оконных функций в настоящее время требуется HiveContext; –

+0

Какую версию искры вы используете? Я думаю, что он должен работать готово без поддержки поддержки Hive в Spark 2.0.x –

+0

Я использую версию scala версии 2.11.8 и искру mllib_2.11 версии 1.6.2 –