Если я правильно понял вопрос правильно - вы хотите сравнить (с помощью функции) каждое значение к значению того же столбца в предыдущей записи. Вы можете сделать это с помощью lag
окна Функции:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
import spark.implicits._
// some data...
val df = Seq(
(1, "A1", "B1"),
(2, "A2", "B2"),
(3, "A3", "B3")
).toDF("ID","COL1", "COL2")
// some made-up comparisons - fill in whatever you want...
def compareCol1(curr: Column, prev: Column): Column = curr > prev
def compareCol2(curr: Column, prev: Column): Column = concat(curr, prev)
// creating window - ordered by ID
val window = Window.orderBy("ID")
// using the window with lag function to compare to previous value in each column
df.withColumn("COL1-comparison", compareCol1($"COL1", lag("COL1", 1).over(window)))
.withColumn("COL2-comparison", compareCol2($"COL2", lag("COL2", 1).over(window)))
.show()
// +---+----+----+---------------+---------------+
// | ID|COL1|COL2|COL1-comparison|COL2-comparison|
// +---+----+----+---------------+---------------+
// | 1| A1| B1| null| null|
// | 2| A2| B2| true| B2B1|
// | 3| A3| B3| true| B3B2|
// +---+----+----+---------------+---------------+
Я не понимаю, если у вас есть для сравнения значений в разных столбцах друг с другом на основе каждого ряда или, если вы хотите сравнить две последующие строки друг с другом. – stefanobaghino
Что вы имеете в виду при сравнении? вы хотите добавить булевский столбец, является ли текущий тот же, что и предыдущий? И что вы заказываете? id? они последовательны? вам нужно пройти все (т. е. есть ли еще какой-то другой столбец, который вы группируете первым?) –
добавьте, по крайней мере, ожидаемый результат и даже лучше, если вы объясните пример использования и добавите код, который вы пробовали до сих пор , – maasg