Я использую Spark 2.0.2, Kafka 0.10.1 и интеграцию с искрообразованием-кафкой-0-8. Я хочу сделать следующее:Плохая работа с функцией окна в потоковой работе
Я извлекаю функции в потоковой работе из соединений NetFlow, а не применяю записи к k-образной модели. Некоторые из функций являются простыми, которые вычисляются непосредственно из записи. Но у меня также есть более сложные функции, которые раньше зависят от записей из указанного временного окна. Они подсчитывают, сколько соединений за последнюю секунду было связано с тем же хостом или сервисом, что и текущий. Я решил использовать для этого функции окна SQL.
Так я строю спецификации окон:
val hostCountWindow = Window.partitionBy("plainrecord.ip_dst").orderBy(desc("timestamp")).rangeBetween(-1L, 0L)
val serviceCountWindow = Window.partitionBy("service").orderBy(desc("timestamp")).rangeBetween(-1L, 0L)
и функция, которая вызывается для извлечения этой функции на каждую партию:
def extractTrafficFeatures(dataset: Dataset[Row]) = {
dataset
.withColumn("host_count", count(dataset("plainrecord.ip_dst")).over(hostCountWindow))
.withColumn("srv_count", count(dataset("service")).over(serviceCountWindow))
}
И использовать эту функцию следующим образом
stream.map(...).map(...).foreachRDD { rdd =>
val dataframe = rdd.toDF(featureHeaders: _*).transform(extractTrafficFeatures(_))
...
}
Проблема в том, что это имеет очень плохую производительность. Для партии требуется от 1 до 3 секунд для средней скорости ввода менее 100 записей в секунду. Я предполагаю, что это происходит из раздела, который вызывает много перетасовки?
Я попытался использовать API RDD и countByValueAndWindow()
. Это выглядит намного быстрее, но код выглядит лучше и чище с помощью API DataFrame.
Есть ли лучший способ рассчитать эти функции в потоковых данных? Или я делаю что-то не так?