-> Ограничений по количеству потоков нет, флайн будет масштабироваться в зависимости от памяти/CPU диспетчера заданий/диспетчера задач, используемой распараллеливания и количества слотов. Я использую YARN для управления ресурсами. Если количество подключенных потоков велико, тогда нам нужно быть немного осторожным, чтобы не все/основная часть обработки выполнялась в некоторых диспетчерах задач, так как это замедлит процесс. В кафкинском потоке могут быть задержки или внутренние задержки из-за того, что некоторые диспетчеры задач могут загружаться с большой нагрузкой, и для этого необходимо установить профилактические проверки.
-> Поддержка непрерывных запросов была построена как часть последней версии flink, вы можете проверить документацию flink для нее.
-> Если путем чтения одного потока данных другому вы подразумеваете подключение двух потоков в терминологии flink, тогда мы можем связать их с общим ключом и поддерживать состояние значения. Обратите внимание, что состояние значения сохраняется в диспетчере задач и не используется совместно с диспетчерами задач. Иначе, если вы подразумеваете объединение двух или более потоков, то мы можем строить плоские функции таким образом, чтобы данные из таких потоков поступали в стандартном формате.
Пример объединения: вал Stream1: DataStream [UserBookingEvent] = BookingClosure.getSource (RunMode) .getSource (ENV) .map (новый ClosureMapFunction)
вал stream2: DataStream [UserBookingEvent] = BookingCancel.getSource (RunMode) .getSource (ENV) .map (новый CancelMapFunction)
вал unionStream: DataStream [UserBookingEvent] = stream1.union (stream2)
import org.apache.flink.api.common.functions.MapFunction
import org.json4s.jackson.JsonMethods.{parse => _, parseOpt => _}
import org.json4s.native.JsonMethods._
import org.slf4j.{Logger, LoggerFactory}
class CancelMapFunction extends MapFunction[String, Option[UserBookingEvent]] {
override def map(in: String): Option[UserBookingEvent] = {
val LOG: Logger = LoggerFactory.getLogger(classOf[CancelMapFunction])
try {
implicit lazy val formats = org.json4s.DefaultFormats
val json = parse(in)
..............
} catch {
case e: Exception => {
LOG.error("Could not parse Cancel Event= " + in + e.getMessage)
None
}
}