2016-09-09 8 views
2

Я пишу прототип приложения, используя Apache Flink. В этом процессе я решил использовать org.apache.flink.streaming.api.functions.windowing.WindowFunction для конкретного примера использования. Однако при написании тела функции apply() я столкнулся с этой ошибкой (код ниже не из приложения, которое я пишу - мои типы данных разные - это из кода примера, доступного на сайте документации Flink):Scala WindowFunction не компилируется

import scala.collection.Iterable 
import scala.collection.Map 
import org.apache.flink.streaming.api.functions.windowing.WindowFunction 
import org.apache.flink.streaming.api.windowing.windows.{TimeWindow} 
import org.apache.flink.util.Collector 
import scala.collection.JavaConversions._ 

class MyWindowFunction extends WindowFunction[(String, Long), String, String, TimeWindow] { 

    def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = { 
    var count = 0L 
    for (in <- input) { 
     count = count + 1 
    } 
    out.collect(s"Window $window count: $count") 
    } 
} 

компилятор жалуется:

Error:(16, 7) class MyWindowFunction needs to be abstract, since method apply in trait WindowFunction of type 
(x$1: String, x$2: org.apache.flink.streaming.api.windowing.windows.TimeWindow, 
x$3: Iterable[(String, Long)], 
x$4: org.apache.flink.util.Collector[String])Unit is not defined 
    class MyWindowFunction extends WindowFunction[(String, Long), String, String, TimeWindow] { 

Я проверил порядок параметров в применяются(); они кажутся правильными.

По какой-то причине я не могу определить точный источник ошибки. Может кто-то, пожалуйста, подтолкнуть меня к решению?

ответ

4

Я нашел причину этой ошибки.

Что было не ясно, для меня был тот факт, что API Apache FLiNK ожидает в java.lang.Iterable, а его эквивалент Scala:

class MyWindowFunction extends 
     WindowFunction[(String, Long), String, String, TimeWindow] { 

    override 
    def apply(
     key: String, 
     w: TimeWindow, 
     iterable: Iterable[(String, Long)], // from java.lang.Iterable 
     collector: Collector[String]): Unit = { 

     // .... 
    } 
} 

Итак, я должен был импортировать надлежащим образом:

import java.lang.Iterable // From Java 
import java.util.Map  // From Java 

import org.apache.flink.streaming.api.functions.windowing.WindowFunction 
import org.apache.flink.streaming.api.windowing.windows.TimeWindow 
import org.apache.flink.util.Collector 

import scala.collection.JavaConversions._ // Implicit conversions 

class MyWindowFunction 
    extends WindowFunction[(String, Long), String, String, TimeWindow] { 

    override 
    def apply(
     key: String, 
     w: TimeWindow, 
     iterable: Iterable[(String, Long)], 
     collector: Collector[String]): Unit = { 

    // .... 

    } 
} 

Все было хорошо!

 Смежные вопросы

  • Нет связанных вопросов^_^