2015-12-11 4 views
5

Я использую PySpark. У меня есть столбец ('dt') в dataframe ('canon_evt'), что это временная метка. Я пытаюсь удалить секунды из значения DateTime. Он изначально читается из паркета как String. Затем я пытаюсь преобразовать его в отметку времени черезPySpark 1.5 Как сократить временную метку до ближайшей минуты с секунд

canon_evt = canon_evt.withColumn('dt',to_date(canon_evt.dt)) 
canon_evt= canon_evt.withColumn('dt',canon_evt.dt.astype('Timestamp')) 

Тогда я хотел бы удалить секунды. Я попробовал «trunc», «date_format» или даже попытался объединить фрагменты, как показано ниже. Я думаю, что для этого требуется какая-то карта и комбинация лямбда, но я не уверен, является ли Timestamp подходящим форматом и можно ли избавиться от секунд.

canon_evt = canon_evt.withColumn('dyt',year('dt') + '-' + month('dt') + 
    '-' + dayofmonth('dt') + ' ' + hour('dt') + ':' + minute('dt')) 

[Row(dt=datetime.datetime(2015, 9, 16, 0, 0),dyt=None)] 
+0

Не могли бы вы опубликовать, как это выглядит, когда вы читаете паркет? – WoodChopper

+0

[Row (dt = '2015-09-16 05:39:46')], Row (dt = '2015-09-16 05:40:46')] – PR102012

+0

'zero323', спасибо за супер быстрый Помогите! – PR102012

ответ

6

Преобразования в Unix временных метки и основную арифметику должна к уловке:

from pyspark.sql import Row 
from pyspark.sql.functions import col, unix_timestamp, round 

df = sc.parallelize([ 
    Row(dt='1970-01-01 00:00:00'), 
    Row(dt='2015-09-16 05:39:46'), 
    Row(dt='2015-09-16 05:40:46'), 
    Row(dt='2016-03-05 02:00:10'), 
]).toDF() 


## unix_timestamp converts string to Unix timestamp (bigint/long) 
## in seconds. Divide by 60, round, multiply by 60 and cast 
## should work just fine. 
## 
dt_truncated = ((round(unix_timestamp(col("dt"))/60) * 60) 
    .cast("timestamp")) 

df.withColumn("dt_truncated", dt_truncated).show(10, False) 
## +-------------------+---------------------+ 
## |dt     |dt_truncated   | 
## +-------------------+---------------------+ 
## |1970-01-01 00:00:00|1970-01-01 00:00:00.0| 
## |2015-09-16 05:39:46|2015-09-16 05:40:00.0| 
## |2015-09-16 05:40:46|2015-09-16 05:41:00.0| 
## |2016-03-05 02:00:10|2016-03-05 02:00:00.0| 
## +-------------------+---------------------+ 
+0

Если бы у меня был доступ к Spark 1.3 и, следовательно, функция noix_timestamp, было бы все же легко выполнить в Spark SQL или DataFrame? – PR102012

+0

Просто используйте [Hive UDF] (https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-DateFunctions) – zero323

1

Я думаю zero323 имеет лучший ответ. Это раздражает, что Spark не поддерживает это изначально, учитывая, как легко его реализовать. Для потомков, вот функция, которую я использую:

def trunc(date, format): 
    """Wraps spark's trunc fuction to support day, minute, and hour""" 
    import re 
    import pyspark.sql.functions as func 

    # Ghetto hack to get the column name from Column object or string: 
    try: 
     colname = re.match(r"Column<.?'(.*)'>", str(date)).groups()[0] 
    except AttributeError: 
     colname = date 

    alias = "trunc(%s, %s)" % (colname, format) 

    if format in ('year', 'YYYY', 'yy', 'month', 'mon', 'mm'): 
     return func.trunc(date, format).alias(alias) 
    elif format in ('day', 'DD'): 
     return func.date_sub(date, 0).alias(alias) 
    elif format in ('min',): 
     return ((func.round(func.unix_timestamp(date)/60) * 60).cast("timestamp")).alias(alias) 
    elif format in ('hour',): 
     return ((func.round(func.unix_timestamp(date)/3600) * 3600).cast("timestamp")).alias(alias) 
+0

Спасибо! Ваш ответ дал мне именно то, что я хотел найти. – Paul