2016-03-30 5 views
2

Я хочу написать пользовательскую функцию группировки и агрегации, чтобы получить заданные пользователем имена столбцов и заданную пользователем карту агрегации. Я не знаю названия столбцов и карту агрегации вверх. Я хочу написать функцию, подобную приведенной ниже. Но я новичок в Scala, и я не могу ее решить.Scala-Spark Динамически вызывать groupby и agg с значениями параметров

def groupAndAggregate(df: DataFrame, aggregateFun: Map[String, String], cols: List[String]): DataFrame ={ 
    val grouped = df.groupBy(cols) 
    val aggregated = grouped.agg(aggregateFun) 
    aggregated.show() 
} 

и хотите назвать это как

val listOfStrings = List("A", "B", "C") 
val result = groupAndAggregate(df, Map("D"-> "SUM", "E"-> "COUNT"), listOfStrings) 

Как я могу это сделать? Может кто-нибудь мне помочь.

ответ

4

Ваш код почти правильно - с двумя вопросами:

  1. Возвращаемый тип вашей функции DataFrame, но последняя строка aggregated.show(), которая возвращает Unit. Удалить вызов show вернуть aggregated себя, или просто вернуть результат agg сразу

  2. DataFrame.groupBy ожидает аргументы следующим образом: col1: String, cols: String* - так что вы должны передать соответствующие аргументы: первые столбцы, а затем остальные столбцы как список аргументов, вы можете сделать это следующим образом: df.groupBy(cols.head, cols.tail: _*)

в общем, ваша функция будет:

def groupAndAggregate(df: DataFrame, aggregateFun: Map[String, String], cols: List[String]): DataFrame ={ 
    val grouped = df.groupBy(cols.head, cols.tail: _*) 
    val aggregated = grouped.agg(aggregateFun) 
    aggregated 
} 

Или аналогичный сокращенный вариант:

def groupAndAggregate(df: DataFrame, aggregateFun: Map[String, String], cols: List[String]): DataFrame = { 
    df.groupBy(cols.head, cols.tail: _*).agg(aggregateFun) 
} 

Если вы сделать хотите позвонить show в вашей функции:

def groupAndAggregate(df: DataFrame, aggregateFun: Map[String, String], cols: List[String]): DataFrame ={ 
    val grouped = df.groupBy(cols.head, cols.tail: _*) 
    val aggregated = grouped.agg(aggregateFun) 
    aggregated.show() 
    aggregated 
} 
+0

Большое спасибо за это. Да. 'df.groupBy (cols.head, cols.tail: _ *)' это то, о чем я не мог думать. Вторая версия - это то, что мне нужно. Остальное - только для локального тестирования. – NehaM

+0

Для меня это работало как val key = List ("key1", "key2") val grouped = df.groupBy (cols.head, cols: _ *) – Nitin