3

Я использую Spark 2.0.2, Kafka 0.10.1 и интеграцию с искрообразованием-кафкой-0-8. Я хочу сделать следующее:Плохая работа с функцией окна в потоковой работе

Я извлекаю функции в потоковой работе из соединений NetFlow, а не применяю записи к k-образной модели. Некоторые из функций являются простыми, которые вычисляются непосредственно из записи. Но у меня также есть более сложные функции, которые раньше зависят от записей из указанного временного окна. Они подсчитывают, сколько соединений за последнюю секунду было связано с тем же хостом или сервисом, что и текущий. Я решил использовать для этого функции окна SQL.

Так я строю спецификации окон:

val hostCountWindow = Window.partitionBy("plainrecord.ip_dst").orderBy(desc("timestamp")).rangeBetween(-1L, 0L) 
val serviceCountWindow = Window.partitionBy("service").orderBy(desc("timestamp")).rangeBetween(-1L, 0L) 

и функция, которая вызывается для извлечения этой функции на каждую партию:

def extractTrafficFeatures(dataset: Dataset[Row]) = { 
    dataset 
    .withColumn("host_count", count(dataset("plainrecord.ip_dst")).over(hostCountWindow)) 
    .withColumn("srv_count", count(dataset("service")).over(serviceCountWindow)) 
} 

И использовать эту функцию следующим образом

stream.map(...).map(...).foreachRDD { rdd => 
    val dataframe = rdd.toDF(featureHeaders: _*).transform(extractTrafficFeatures(_)) 
    ... 
} 

Проблема в том, что это имеет очень плохую производительность. Для партии требуется от 1 до 3 секунд для средней скорости ввода менее 100 записей в секунду. Я предполагаю, что это происходит из раздела, который вызывает много перетасовки?

Я попытался использовать API RDD и countByValueAndWindow(). Это выглядит намного быстрее, но код выглядит лучше и чище с помощью API DataFrame.

Есть ли лучший способ рассчитать эти функции в потоковых данных? Или я делаю что-то не так?

ответ

0

Относительно низкую производительность следует ожидать здесь. Ваш код должен перетасовать и сортировать данные дважды, один раз:

Window 
    .partitionBy("plainrecord.ip_dst") 
    .orderBy(desc("timestamp")).rangeBetween(-1L, 0L) 

и один раз:

Window 
    .partitionBy("service") 
    .orderBy(desc("timestamp")).rangeBetween(-1L, 0L) 

Это будет иметь огромное влияние на время выполнения, и если эти жесткие требования вы выиграли» быть в состоянии сделать намного лучше.