Я хочу использовать FlinkCEP только для «ленивого» совпадения на шаблоне. Как я могу это сделать? , например. У меня есть входной поток ACABCABCB, и я хочу совместить с A followBy C, чтобы получить только 3 матча, а не 6 матчей.Хо, я могу провести ленивый матч с Flink CEP
Я создал следующий пример, чтобы проиллюстрировать мою проблему.
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
case class MyEvent(id: Int, kind: String, value: String)
case class MyAggregatedEvent(id: Int, concatenatedValue: String)
val eventStream = env.fromElements(
MyEvent(1, "A", "1"), MyEvent(1, "C", "1"),
MyEvent(1, "A", "2"), MyEvent(1, "B", "1"), MyEvent(1, "C", "2"),
MyEvent(1, "A", "3"), MyEvent(1, "D", "2"), MyEvent(1, "C", "3"),
MyEvent(1, "B", "3")
)
val pattern: Pattern[MyEvent, _] = Pattern
.begin[MyEvent]("pA").where(e => e.kind == "A")
.next("pC").where(e => e.kind == "C")
.within(Time.seconds(5))
val patternNextStream: PatternStream[MyEvent] = CEP.pattern(eventStream.keyBy(_.id), pattern)
val outNextStream: DataStream[MyAggregatedEvent] = patternNextStream.flatSelect {
(pattern: scala.collection.mutable.Map[String, MyEvent], collector: Collector[MyAggregatedEvent]) =>
val partA = pattern.get("pA").get
val partC = pattern.get("pC").get
collector.collect(MyAggregatedEvent(partA.id, partA.value + "=>" + partC.value))
}
outNextStream.print()
env.execute("Experiment")
Это дает мне следующий вывод:
MyAggregatedEvent (1,1 => 1)
Когда я изменить шаблон, чтобы:
val pattern: Pattern[MyEvent, _] = Pattern
.begin[MyEvent]("pA").where(e => e.kind == "A")
.followedBy("pC").where(e => e.kind == "C")
.within(Time.seconds(5))
Тогда печатается следующее:
MyAggregatedEvent (1,1 => 1)
MyAggregatedEvent (1,1 => 2)
MyAggregatedEvent (1,2 => 2)
MyAggregatedEvent (1,1 => 3)
MyAggregatedEvent (1,2 = > 3)
MyAggregatedEvent (1,3 => 3)
Как я могу создать шаблон, который соответствует только каждое событие один раз, так что мой вывод будет:
MyAggregatedEvent (1,1 => 1)
MyAggregatedEvent (1,2 => 2)
MyAggregatedEvent (1,3 => 3)