2017-02-21 27 views
1

Привет, для моего конкретного требования, я хочу написать UDAF, которые просто собирают все входные строки.Как написать Spark UDAF, который просто делает сборку строк?

Входные данные представляют собой строки из двух столбцов, Double Type;

Промежуточные схемы, «Я думал», это ArrayList (поправьте меня, если я ошибаюсь)

возвращаемый тип данных является ArrayList

Я написал «идею» моего UDAF, но я попросите кого-нибудь помочь мне закончить его.

class CollectorUDAF() extends UserDefinedAggregateFunction { 

    // Input Data Type Schema 
    def inputSchema: StructType = StructType(Array(StructField("value", DoubleType), StructField("y", DoubleType))) 

    // Intermediate Schema 
    def bufferSchema = util.ArrayList[Array(StructField("value", DoubleType), StructField("y", DoubleType)] 

    // Returned Data Type . 
    def dataType: DataType = util.ArrayList[Array(StructField("value", DoubleType), StructField("y", DoubleType)] 

    // Self-explaining 
    def deterministic = true 

    // This function is called whenever key changes 
    def initialize(buffer: MutableAggregationBuffer) = { 

    } 

    // Iterate over each entry of a group 
    def update(buffer: MutableAggregationBuffer, input: Row) = { 


    } 

    // Called after all the entries are exhausted. 
    def evaluate(buffer: Row) = { 

    } 

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { 

    } 

}

ответ

2

Если я понял ваш вопрос правильно, следующий должен быть ваше решение:

class CollectorUDAF() extends UserDefinedAggregateFunction { 

    // Input Data Type Schema 
    def inputSchema: StructType = new StructType().add("value", DataTypes.DoubleType).add("y", DataTypes.DoubleType) 

    // Intermediate Schema 
    val bufferFields : util.ArrayList[StructField] = new util.ArrayList[StructField] 
    val bufferStructField : StructField = DataTypes.createStructField("array", DataTypes.createArrayType(DataTypes.StringType, true), true) 
    bufferFields.add(bufferStructField) 
    def bufferSchema: StructType = DataTypes.createStructType(bufferFields) 

    // Returned Data Type . 
    def dataType: DataType = DataTypes.createArrayType(DataTypes.DoubleType) 

    // Self-explaining 
    def deterministic = true 

    // This function is called whenever key changes 
    def initialize(buffer: MutableAggregationBuffer) = { 
    buffer(0, new java.util.ArrayList[Double]) 
    } 

    // Iterate over each entry of a group 
    def update(buffer: MutableAggregationBuffer, input: Row) = { 
    val DoubleList: util.ArrayList[Double] = new util.ArrayList[Double](buffer.getList(0)) 
    DoubleList.add(input.getDouble(0)) 
    DoubleList.add(input.getDouble(1)) 
    buffer.update(0, DoubleList) 
    } 

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { 
    buffer1.update(0, buffer1.getList(0).toArray() ++ buffer2.getList(0).toArray()) 
    } 
    // Called after all the entries are exhausted. 
    def evaluate(buffer: Row) = { 
    buffer.getList(0).toArray() 
    } 
}