2016-11-22 3 views
0

В панде У меня есть функция, похожая наискра расстояние до ближайшего SQL праздника

indices = df.dateColumn.apply(holidays.index.searchsorted) 
df['nextHolidays'] = holidays.index[indices] 
df['previousHolidays'] = holidays.index[indices - 1] 

, который вычисляет расстояние до ближайшего праздника и магазинов, как новый столбец.

searchsortedhttp://pandas.pydata.org/pandas-docs/version/0.18.1/generated/pandas.Series.searchsorted.html было отличным решением для панд, так как это дает мне индекс следующего праздника без высокой алгоритмической сложности Parallelize pandas apply, например. этот подход был намного быстрее, чем параллельный цикл.

Как я могу достичь этого в искрах или ульях?

ответ

1

Это может быть сделано с использованием агрегатов, но этот метод будет иметь более высокую сложность, чем метод pandas. Но вы можете добиться аналогичной производительности с помощью UDF. Это не будет столь же элегантно, как панды, но:

Предполагая, что этот набор данных праздников:

holidays = ['2016-01-03', '2016-09-09', '2016-12-12', '2016-03-03'] 
index = spark.sparkContext.broadcast(sorted(holidays)) 

и набор данные дат 2016 в dataframe:

from datetime import datetime, timedelta 
dates_array = [(datetime(2016, 1, 1) + timedelta(i)).strftime('%Y-%m-%d') for i in range(366)] 
from pyspark.sql import Row 
df = spark.createDataFrame([Row(date=d) for d in dates_array]) 

ОДС можно использовать панда searchsorted, но нужно будет установить панды на исполнителей. Insted вы можете использовать план питона так:

def nearest_holiday(date): 
    last_holiday = index.value[0] 
    for next_holiday in index.value: 
     if next_holiday >= date: 
      break 
     last_holiday = next_holiday 
    if last_holiday > date: 
     last_holiday = None 
    if next_holiday < date: 
     next_holiday = None 
    return (last_holiday, next_holiday) 


from pyspark.sql.types import * 
return_type = StructType([StructField('last_holiday', StringType()), StructField('next_holiday', StringType())]) 

from pyspark.sql.functions import udf 
nearest_holiday_udf = udf(nearest_holiday, return_type) 

И может быть использован с withColumn:

df.withColumn('holiday', nearest_holiday_udf('date')).show(5, False) 

+----------+-----------------------+ 
|date  |holiday    | 
+----------+-----------------------+ 
|2016-01-01|[null,2016-01-03]  | 
|2016-01-02|[null,2016-01-03]  | 
|2016-01-03|[2016-01-03,2016-01-03]| 
|2016-01-04|[2016-01-03,2016-03-03]| 
|2016-01-05|[2016-01-03,2016-03-03]| 
+----------+-----------------------+ 
only showing top 5 rows 
+0

Спасибо, это выглядит здорово. Мне нужно будет перенести его на scala, хотя:) –

+0

Что такое операция 'sorted (holidays)', на которую вы ссылаетесь? это pyspark api? –

+0

Это питон. Он сортирует коллекцию, поэтому в UDF я могу пройти через нее, чтобы найти соответствующие даты. – Mariusz