2016-04-05 4 views
1

Рассмотрим У меня есть ниже dataframeКак я могу конвертировать groupedData в Dataframe в R

AccountId,CloseDate 
1,2015-05-07 
2,2015-05-09 
3,2015-05-01 
4,2015-05-07 
1,2015-05-09 
1,2015-05-12 
2,2015-05-12 
3,2015-05-01 
3,2015-05-01 
3,2015-05-02 
4,2015-05-17 
1,2015-05-12 

Я хочу, чтобы сгруппировать его, основываясь на ACCOUNTID, а затем я хочу добавить еще один столбец имен date_diff, который будет содержать разницу в CloseDate между текущей строкой и предыдущей строкой. Обратите внимание, что я хочу, чтобы этот параметр date_diff вычислялся только для строк, имеющих тот же AccountId. Поэтому мне нужно сгруппировать данные, прежде чем добавить еще один столбец

Ниже код R, который я использую

df <- read.df(sqlContext, "/home/ubuntu/work/csv/sample.csv", source = "com.databricks.spark.csv", inferSchema = "true", header="true") 
    df$CloseDate <- to_date(df$CloseDate) 
    groupedData <- SparkR::group_by(df, df$AccountId) 
    SparkR::mutate(groupedData, DiffCloseDt = as.numeric(SparkR::datediff((CloseDate),(SparkR::lag(CloseDate,1))))) 

Чтобы добавить еще один столбец, я использую мутировать. Но поскольку group_by возвращает groupedData, я не могу использовать mutate здесь. Я получаю ошибку ниже

Error in (function (classes, fdef, mtable) : 
    unable to find an inherited method for function ‘mutate’ for signature ‘"GroupedData"’ 

Так как я могу конвертировать GroupedData в Dataframe, так что я могу добавить столбцы с помощью мутировать?

ответ

3

То, чего вы хотите, достичь невозможно, используя group_by. Как уже говорилось довольно много раз на SO:

group_by на DataFrame не физический группе данных. Кроме того, порядок операций после применения group_by является недетерминированным.

Для достижения желаемого выхода вы должны будете использовать оконные функции и обеспечивает явное упорядочение:

df <- structure(list(AccountId = c(1L, 2L, 3L, 4L, 1L, 1L, 2L, 3L, 
    3L, 3L, 4L, 1L), CloseDate = structure(c(3L, 4L, 1L, 3L, 4L, 
    5L, 5L, 1L, 1L, 2L, 6L, 5L), .Label = c("2015-05-01", "2015-05-02", 
    "2015-05-07", "2015-05-09", "2015-05-12", "2015-05-17"), class = "factor")), 
    .Names = c("AccountId", "CloseDate"), 
    class = "data.frame", row.names = c(NA, -12L)) 

hiveContext <- sparkRHive.init(sc) 
sdf <- createDataFrame(hiveContext, df) 
registerTempTable(sdf, "df") 

query <- "SELECT *, LAG(CloseDate, 1) OVER (
    PARTITION BY AccountId ORDER BY CloseDate 
) AS DateLag FROM df" 

dfWithLag <- sql(hiveContext, query) 

withColumn(dfWithLag, "diff", datediff(dfWithLag$CloseDate, dfWithLag$DateLag)) %>% 
    head() 

## AccountId CloseDate DateLag diff 
## 1   1 2015-05-07  <NA> NA 
## 2   1 2015-05-09 2015-05-07 2 
## 3   1 2015-05-12 2015-05-09 3 
## 4   1 2015-05-12 2015-05-12 0 
## 5   2 2015-05-09  <NA> NA 
## 6   2 2015-05-12 2015-05-09 3