2016-07-06 1 views
3

Я пытаюсь сравнить одну строку в кадре данных со следующей, чтобы увидеть разницу в метке времени. В настоящее время данные выглядит следующим образом:pyspark, Сравните две строки в dataframe

itemid | eventid | timestamp 
---------------------------- 
134 | 30  | 2016-07-02 12:01:40 
134 | 32  | 2016-07-02 12:21:23 
125 | 30  | 2016-07-02 13:22:56 
125 | 32  | 2016-07-02 13:27:07 

Я пытался отображение функции на dataframe, чтобы для сравнения, как это: (примечание: Я пытаюсь получить строки с разницей больше, чем 4 часа)

items = df.limit(10)\ 
      .orderBy('itemid', desc('stamp'))\ 
      .map(lambda x,y: (x.stamp - y.stamp) > 14400).collect() 

Но я получаю следующее сообщение об ошибке:

Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.collectAndServe 

Что я считаю, из-за моей помощью функции карты неправильно. Помощь с использованием карты или другое решение будет оценено по достоинству.

UPDATE: @ zero323 в ответ был информативен на моем неправильном использовании отображения, однако система я использую под управлением версии Spark, прежде, чем 2,02, и я работаю с данными в Кассандре.

Мне удалось решить это с помощью mapPartitions. См. Мой ответ ниже.

UPDATE (2017/03/27): Поскольку первоначально маркировка ответ на этот пост мое понимание Спарк значительно улучшилось. Я обновил свой ответ ниже, чтобы показать свое текущее решение.

ответ

-1

Комментарий от @ShuaiYuan на исходном ответе правильный. За последний год я разработал гораздо лучшее представление о том, как работает Spark, и фактически переписал программу, над которой я работал на этот пост.

НОВЫЙ ОТВЕТ (2017/03/27)
Для выполнения сравнения двух строк из dataframe я в конечном итоге, используя RDD. Я группирую данные по ключу (в этом случае id объекта) и игнорирую eventid, поскольку в этом уравнении это не имеет значения. Затем я отображаю лямбда-функцию на строки, возвращая кортеж ключа и список кортежей, содержащих начальные и конечные промежутки событий, которые производятся из функции «findGaps», которая выполняет итерацию по списку значений (отсортированные временные метки), связанным для каждого ключа. Как только это будет завершено, я отфильтровываю ключи без временных разрывов, а затем flatMapValues, чтобы вернуть данные в более похожий формат sql. Это делается с помощью следующего кода:

# Find time gaps in list of datetimes where firings are longer than given duration. 
def findGaps(dates, duration): 
    result = [] 
    length = len(dates) 

    # convert to dates for comparison 
    first = toDate(dates[0]) 
    last = toDate(dates[length - 1]) 
    for index, item in enumerate(dates): 
     if index < length -1 and (dates[index + 1] - item).total_seconds() > duration: 
      # build outage tuple and append to list 
      # format (start, stop, duration) 
      result.append(formatResult(item, dates[index + 1], kind)) 
    return result 

outage_list = outage_join_df.rdd\ 
          .groupByKey()\ 
          .map(lambda row: (
            row[0], 
            findGaps(
             sorted(list(row[1])), 
             limit 
            ) 
           ) 
          )\ 
          .filter(lambda row: len(row[1]) > 0)\ 
          .flatMapValues(lambda row: row)\ 
          .map(lambda row: (
           row[0]['itemid'],  # itemid 
           row[1][0].date(),  # date 
           row[1][0],   # start 
           row[1][1],   # stop 
           row[1][2]    # duration 
          ))\ 
          .collect() 

ОРИГИНАЛЬНЫЙ ОТВЕТ (НЕПРАВИЛЬНО)
мне удалось решить с помощью mapPartitions:

def findOutage(items): 
    outages = [] 

    lastStamp = None 
    for item in items: 
     if lastStamp and (lastStamp - item.stamp).total_seconds() > 14400: 
      outages.append({"item": item.itemid, 
          "start": item.stamp.isoformat(), 
          "stop": lastStamp.isoformat()}) 
     lastStamp = item.stamp 
    return iter(outages) 

items = df.limit(10).orderBy('itemid', desc('stamp')) 

outages = items.mapPartitions(findOutage).collect() 

Спасибо всем за помощь!

+1

Нужно убедиться, что набор данных разделен на 'timestamp'. – ShuaiYuan

+0

@ShuaiYuan вы правы. Я обновил свой ответ, чтобы показать свое текущее решение проблемы. – phelpsiv

6

Да, вы используете функцию map неправильно. map работает на одном элементе в то время. Вы можете попытаться использовать такие функции окна, как это:

from pyspark.sql.functions import col, lag 
from pyspark.sql.window import Window 

df = (
    sc.parallelize([ 
     (134, 30, "2016-07-02 12:01:40"), (134, 32, "2016-07-02 12:21:23"), 
     (125, 30, "2016-07-02 13:22:56"), (125, 32, "2016-07-02 13:27:07"), 
    ]).toDF(["itemid", "eventid", "timestamp"]) 
    .withColumn("timestamp", col("timestamp").cast("timestamp")) 
) 

w = Window.partitionBy("itemid").orderBy("timestamp") 

diff = col("timestamp").cast("long") - lag("timestamp", 1).over(w).cast("long") 

df.withColumn("diff", diff) 
+1

Хотя убедитесь, что вы используете HiveContext или Spark 2.02 здесь. – Jeff

+0

Спасибо за понимание карты, но, видимо, для использования окна требуется контекст Hive. Система, с которой я работаю, просто искра с Кассандрой. Я уточню вопрос, чтобы отметить это. – phelpsiv

+0

'HiveContext' не требует, чтобы улей за просмотр. Для этого требуется только Spark, построенный с поддержкой Hive (который используется по умолчанию в случае двоичных файлов предварительной сборки). – zero323

 Смежные вопросы

  • Нет связанных вопросов^_^