2016-06-09 3 views
1

Я новичок в Flink Streaming API и хочу выполнить следующую простую задачу (IMO). У меня есть два потока, и я хочу присоединиться к ним, используя окна с подсчетом. Код, который у меня есть, следующий:Присоединяйтесь к двум потокам, используя окно с подсчетом

public class BaselineCategoryEquiJoin { 

private static final String recordFile = "some_file.txt"; 

private static class ParseRecordFunction implements MapFunction<String, Tuple2<String[], MyRecord>> { 
    public Tuple2<String[], MyRecord> map(String s) throws Exception { 
     MyRecord myRecord = parse(s); 
     return new Tuple2<String[], myRecord>(myRecord.attributes, myRecord); 
    } 
} 

public static void main(String[] args) throws Exception { 
    StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironment(); 
    ExecutionConfig config = environment.getConfig(); 
    config.setParallelism(8); 
    DataStream<Tuple2<String[], MyRecord>> dataStream = environment.readTextFile(recordFile) 
      .map(new ParseRecordFunction()); 
    DataStream<Tuple2<String[], MyRecord>> dataStream1 = environment.readTextFile(recordFile) 
      .map(new ParseRecordFunction()); 
    DataStreamSink<Tuple2<String[], String[]>> joinedStream = dataStream1 
      .join(dataStream) 
      .where(new KeySelector<Tuple2<String[],MyRecord>, String[]>() { 
       public String[] getKey(Tuple2<String[], MyRecord> recordTuple2) throws Exception { 
        return recordTuple2.f0; 
       } 
      }).equalTo(new KeySelector<Tuple2<String[], MyRecord>, String[]>() { 
       public String[] getKey(Tuple2<String[], MyRecord> recordTuple2) throws Exception { 
        return recordTuple2.f0; 
       } 
      }).window(TumblingProcessingTimeWindows.of(Time.seconds(1))) 
      .apply(new JoinFunction<Tuple2<String[],MyRecord>, Tuple2<String[],MyRecord>, Tuple2<String[], String[]>>() { 
       public Tuple2<String[], String[]> join(Tuple2<String[], MyRecord> tuple1, Tuple2<String[], MyRecord> tuple2) throws Exception { 
        return new Tuple2<String[], String[]>(tuple1.f0, tuple1.f0); 
       } 
      }).print(); 
    environment.execute(); 
} 
} 

Мой код работает без ошибок, но не дает никаких результатов. На самом деле вызов метода apply никогда не вызывается (проверяется добавлением точки останова в режиме отладки). Я думаю, главная причина предыдущего заключается в том, что мои данные не имеют атрибута времени. Поэтому оконное (материализованное через window) выполнено неправильно. Поэтому, мой вопрос заключается в том, как я могу указать, что я хочу, чтобы мое соединение состоялось на основе count-windows. Например, я хочу, чтобы объединение материализовалось каждые 100 кортежей из каждого потока. Возможно ли предыдущее в Flink? Если да, что я должен изменить в своем коде для его достижения.

На этом этапе я должен сообщить вам, что я попытался вызвать метод countWindow(), но по какой-либо причине он не предложен Флинком JoinedStreams.

Спасибо

ответ

2

Соединения с подсчетом не поддерживаются. Вы можете эмулировать окна с подсчетом, используя семантику «время-время» и применять уникальный seq-id как метку времени для каждой записи. Таким образом, временным окном «5» было бы эффективное окно счета 5.

+0

Не могли бы вы посмотреть на это https://stackoverflow.com/questions/46282692/match-based-on- некоторые-свойство-между-два-данных-потоки-и-собирать-все на основе-на-м – Kumar