Я пытаюсь переписать википедия редактировать поток аналитик в Apache FLiNK учебников в Scala от https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/run_example_quickstart.htmlApache Flink Википедию редактировать аналитик с Scala
Код из учебника является
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
public class WikipediaAnalysis {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
.keyBy(new KeySelector<WikipediaEditEvent, String>() {
@Override
public String getKey(WikipediaEditEvent event) {
return event.getUser();
}
});
DataStream<Tuple2<String, Long>> result = keyedEdits
.timeWindow(Time.seconds(5))
.fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
acc.f0 = event.getUser();
acc.f1 += event.getByteDiff();
return acc;
}
});
result.print();
see.execute();
}
}
ниже моей попытка в Скале
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.wikiedits.{WikipediaEditEvent, WikipediaEditsSource}
import org.apache.flink.streaming.api.windowing.time.Time
object WikipediaAnalytics extends App{
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val edits = env.addSource(new WikipediaEditsSource());
val keyedEdits = edits.keyBy(event => event.getUser)
val result = keyedEdits.timeWindow(Time.seconds(5)).fold(("", 0L), (we: WikipediaEditEvent, t: (String, Long)) =>
(we.getUser, t._2 + we.getByteDiff))
}
, который является более или менее словом преобразования слова к Скале, на основании которых тип val result
должен быть DataStream[(String, Long)]
, но фактический тип выводится после fold()
нет, где близко.
Пожалуйста, помогите определить, что не так с кодом лестницу
EDIT1: сделал следующие изменения, используя Карринг схему fold[R]
и тип в настоящее время подтверждает ожидаемый тип, но до сих пор не мог разжиться причине, хотя
val result_1: (((String, Long), WikipediaEditEvent) => (String, Long)) => DataStream[(String, Long)] =
keyedEdits.timeWindow(Time.seconds(5)).fold(("", 0L))
val result_2: DataStream[(String, Long)] = result_1((t: (String, Long), we: WikipediaEditEvent) =>
(we.getUser, t._2 + we.getByteDiff))
Я думаю, что если есть много возможностей для решения окончательного типа, вам нужно будет дать системе вывода типа подсказку. – Ashalynd