Это может быть сделано с использованием агрегатов, но этот метод будет иметь более высокую сложность, чем метод pandas. Но вы можете добиться аналогичной производительности с помощью UDF. Это не будет столь же элегантно, как панды, но:
Предполагая, что этот набор данных праздников:
holidays = ['2016-01-03', '2016-09-09', '2016-12-12', '2016-03-03']
index = spark.sparkContext.broadcast(sorted(holidays))
и набор данные дат 2016 в dataframe:
from datetime import datetime, timedelta
dates_array = [(datetime(2016, 1, 1) + timedelta(i)).strftime('%Y-%m-%d') for i in range(366)]
from pyspark.sql import Row
df = spark.createDataFrame([Row(date=d) for d in dates_array])
ОДС можно использовать панда searchsorted
, но нужно будет установить панды на исполнителей. Insted вы можете использовать план питона так:
def nearest_holiday(date):
last_holiday = index.value[0]
for next_holiday in index.value:
if next_holiday >= date:
break
last_holiday = next_holiday
if last_holiday > date:
last_holiday = None
if next_holiday < date:
next_holiday = None
return (last_holiday, next_holiday)
from pyspark.sql.types import *
return_type = StructType([StructField('last_holiday', StringType()), StructField('next_holiday', StringType())])
from pyspark.sql.functions import udf
nearest_holiday_udf = udf(nearest_holiday, return_type)
И может быть использован с withColumn
:
df.withColumn('holiday', nearest_holiday_udf('date')).show(5, False)
+----------+-----------------------+
|date |holiday |
+----------+-----------------------+
|2016-01-01|[null,2016-01-03] |
|2016-01-02|[null,2016-01-03] |
|2016-01-03|[2016-01-03,2016-01-03]|
|2016-01-04|[2016-01-03,2016-03-03]|
|2016-01-05|[2016-01-03,2016-03-03]|
+----------+-----------------------+
only showing top 5 rows
Спасибо, это выглядит здорово. Мне нужно будет перенести его на scala, хотя:) –
Что такое операция 'sorted (holidays)', на которую вы ссылаетесь? это pyspark api? –
Это питон. Он сортирует коллекцию, поэтому в UDF я могу пройти через нее, чтобы найти соответствующие даты. – Mariusz