2017-01-25 62 views
2

В настоящее время sparklyr (0.5.1) не имеет pivot реализация для Spark.sparklyr pivot dataframe

Итак, интересно, как я могу реализовать это, используя функции invoke.

До сих пор я здесь.

iris_tbl <- copy_to(sc, iris) 
spark_dataframe(iris_tbl) %>% 
    invoke("groupBy", "Species", list()) %>% 
    invoke("pivot", "Sepal.Width", list()) %>% 
    invoke( ...) # <- how to create aggregate expression? 

И зациклился на том, как написать обобщенное выражение?

+0

Просто искал что-то подобное, а также не уверен. Кажется, это несоответствие типа для конца Java, т. Е. Не существует метода 'agg' для передаваемого типа (который предположительно задается' list() '?). Что такое аргумент 'list()' do/from? –

ответ

1

Это займет у вас половину пути туда:

library(stringi) 

sc <- spark_connect("local[*]") 
df <- data.frame(A = c("a", "b", "c"), B = c(1, 3, 5), C = c(2, 4, 6)) 
sdf <- copy_to(sc, df, overwrite =TRUE) 

Вспомогательные функции:

#' Given name, return corresponding SQL function 
sqlf <- function(f) function(x, ...) { 
    invoke_static(sc, "org.apache.spark.sql.functions", f, x, ...) 
} 

Melt функции: использование

#' @param df tbl_spark 
#' @param sc spark_connection 
#' @param id_vars id columns 
#' 
melt <- function(df, sc, id_vars, value_vars = NULL, 
    var_name = "key", value_name = "value") { 
    # Alias for the output view 
    alias <- paste(deparse(substitute(df)), stri_rand_strings(1, 10), sep = "_") 
    # Get session and JVM object 
    spark <- spark_session(sc) 
    jdf <- spark_dataframe(df) 

    # Convert characters to JVM Columns 
    j_id_vars <- lapply(id_vars, sqlf("col")) 

    # Combine columns into array<struct<key,value>> and explode 
    exploded <- sqlf("explode")(sqlf("array")(lapply(value_vars, function(x) { 
    key <- sqlf("lit")(x) %>% invoke("alias", var_name) 
    value <- sqlf("col")(x) %>% invoke("alias", value_name) 
    sqlf("struct")(list(key, value)) 
    }))) 

    # expand struct<..., struct<key, value>> into struct<..., key, value> 
    exprs <- lapply(
    c(id_vars, paste("col", c(var_name, value_name), sep = ".")), 
    sqlf("col")) 

    # Explode and register as temp table 
    jdf %>% 
    invoke("withColumn", "col", exploded) %>% 
    invoke("select", exprs) %>% 
    invoke("createOrReplaceTempView", alias) 

    dplyr::tbl(sc, alias) 
} 

Пример:

melt(sdf, sc, "A", c("B", "C")) 

## Source: query [6 x 3] 
## Database: spark connection master=local[*] app=sparklyr local=TRUE 
## 
## # A tibble: 6 x 3 
##  A key value 
## <chr> <chr> <dbl> 
## 1  a  B  1 
## 2  a  C  2 
## 3  b  B  3 
## 4  b  C  4 
## 5  c  B  5 
## 6  c  C  6