Я новичок в Flink Streaming API и хочу выполнить следующую простую задачу (IMO). У меня есть два потока, и я хочу присоединиться к ним, используя окна с подсчетом. Код, который у меня есть, следующий:Присоединяйтесь к двум потокам, используя окно с подсчетом
public class BaselineCategoryEquiJoin {
private static final String recordFile = "some_file.txt";
private static class ParseRecordFunction implements MapFunction<String, Tuple2<String[], MyRecord>> {
public Tuple2<String[], MyRecord> map(String s) throws Exception {
MyRecord myRecord = parse(s);
return new Tuple2<String[], myRecord>(myRecord.attributes, myRecord);
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironment();
ExecutionConfig config = environment.getConfig();
config.setParallelism(8);
DataStream<Tuple2<String[], MyRecord>> dataStream = environment.readTextFile(recordFile)
.map(new ParseRecordFunction());
DataStream<Tuple2<String[], MyRecord>> dataStream1 = environment.readTextFile(recordFile)
.map(new ParseRecordFunction());
DataStreamSink<Tuple2<String[], String[]>> joinedStream = dataStream1
.join(dataStream)
.where(new KeySelector<Tuple2<String[],MyRecord>, String[]>() {
public String[] getKey(Tuple2<String[], MyRecord> recordTuple2) throws Exception {
return recordTuple2.f0;
}
}).equalTo(new KeySelector<Tuple2<String[], MyRecord>, String[]>() {
public String[] getKey(Tuple2<String[], MyRecord> recordTuple2) throws Exception {
return recordTuple2.f0;
}
}).window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.apply(new JoinFunction<Tuple2<String[],MyRecord>, Tuple2<String[],MyRecord>, Tuple2<String[], String[]>>() {
public Tuple2<String[], String[]> join(Tuple2<String[], MyRecord> tuple1, Tuple2<String[], MyRecord> tuple2) throws Exception {
return new Tuple2<String[], String[]>(tuple1.f0, tuple1.f0);
}
}).print();
environment.execute();
}
}
Мой код работает без ошибок, но не дает никаких результатов. На самом деле вызов метода apply
никогда не вызывается (проверяется добавлением точки останова в режиме отладки). Я думаю, главная причина предыдущего заключается в том, что мои данные не имеют атрибута времени. Поэтому оконное (материализованное через window
) выполнено неправильно. Поэтому, мой вопрос заключается в том, как я могу указать, что я хочу, чтобы мое соединение состоялось на основе count-windows. Например, я хочу, чтобы объединение материализовалось каждые 100 кортежей из каждого потока. Возможно ли предыдущее в Flink? Если да, что я должен изменить в своем коде для его достижения.
На этом этапе я должен сообщить вам, что я попытался вызвать метод countWindow()
, но по какой-либо причине он не предложен Флинком JoinedStreams
.
Спасибо
Не могли бы вы посмотреть на это https://stackoverflow.com/questions/46282692/match-based-on- некоторые-свойство-между-два-данных-потоки-и-собирать-все на основе-на-м – Kumar