2016-12-29 10 views
0

Я пытаюсь переписать википедия редактировать поток аналитик в Apache FLiNK учебников в Scala от https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/run_example_quickstart.htmlApache Flink Википедию редактировать аналитик с Scala

Код из учебника является

import org.apache.flink.api.common.functions.FoldFunction; 
import org.apache.flink.api.java.functions.KeySelector; 
import org.apache.flink.api.java.tuple.Tuple2; 
import org.apache.flink.streaming.api.datastream.DataStream; 
import org.apache.flink.streaming.api.datastream.KeyedStream; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
import org.apache.flink.streaming.api.windowing.time.Time; 
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent; 
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource; 

public class WikipediaAnalysis { 

    public static void main(String[] args) throws Exception { 

    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); 

    DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource()); 

    KeyedStream<WikipediaEditEvent, String> keyedEdits = edits 
     .keyBy(new KeySelector<WikipediaEditEvent, String>() { 
     @Override 
     public String getKey(WikipediaEditEvent event) { 
      return event.getUser(); 
     } 
     }); 

    DataStream<Tuple2<String, Long>> result = keyedEdits 
     .timeWindow(Time.seconds(5)) 
     .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() { 
     @Override 
     public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) { 
      acc.f0 = event.getUser(); 
      acc.f1 += event.getByteDiff(); 
      return acc; 
     } 
     }); 

    result.print(); 

    see.execute(); 
    } 
} 

ниже моей попытка в Скале

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} 
import org.apache.flink.streaming.connectors.wikiedits.{WikipediaEditEvent, WikipediaEditsSource} 
import org.apache.flink.streaming.api.windowing.time.Time 


object WikipediaAnalytics extends App{ 

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment 

    val edits = env.addSource(new WikipediaEditsSource()); 

    val keyedEdits = edits.keyBy(event => event.getUser) 

    val result = keyedEdits.timeWindow(Time.seconds(5)).fold(("", 0L), (we: WikipediaEditEvent, t: (String, Long)) => 
    (we.getUser, t._2 + we.getByteDiff)) 

} 

, который является более или менее словом преобразования слова к Скале, на основании которых тип val result должен быть DataStream[(String, Long)], но фактический тип выводится после fold() нет, где близко.

Пожалуйста, помогите определить, что не так с кодом лестницу

EDIT1: сделал следующие изменения, используя Карринг схему fold[R] и тип в настоящее время подтверждает ожидаемый тип, но до сих пор не мог разжиться причине, хотя

val result_1: (((String, Long), WikipediaEditEvent) => (String, Long)) => DataStream[(String, Long)] = 
    keyedEdits.timeWindow(Time.seconds(5)).fold(("", 0L)) 

    val result_2: DataStream[(String, Long)] = result_1((t: (String, Long), we: WikipediaEditEvent) => 
    (we.getUser, t._2 + we.getByteDiff)) 
+0

Я думаю, что если есть много возможностей для решения окончательного типа, вам нужно будет дать системе вывода типа подсказку. – Ashalynd

ответ

1

проблема, кажется, складка, вы должны иметь закрывающую скобку после вашего аккумулятора InitialValue. Когда вы это исправите, код не сможет скомпилироваться, потому что у него нет TypeInformation, доступного для WikipediaEditEvent. Самый простой способ разрешить это - импортировать больше API flink scala. См. Ниже полный пример:

import org.apache.flink.streaming.api.scala._ 
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource 
import org.apache.flink.streaming.api.windowing.time.Time 

object WikipediaAnalytics extends App { 
    val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment 
    val edits = see.addSource(new WikipediaEditsSource()) 
    val userEditsVolume: DataStream[(String, Int)] = edits 
    .keyBy(_.getUser) 
    .timeWindow(Time.seconds(5)) 
    .fold(("", 0))((acc, event) => (event.getUser, acc._2 + event.getByteDiff)) 
    userEditsVolume.print() 
    see.execute("Wikipedia User Edit Volume") 
} 

 Смежные вопросы

  • Нет связанных вопросов^_^