2017-02-01 12 views
1

У меня есть приложение Akka Stream с одним потоком/графиком. Я хочу измерять скорость потока в источнике и регистрировать его каждые 5 секунд, например «полученные 3 сообщения за последние 5 секунд». Я пробовал с,Как регистрировать скорость потока в потоке Akka?

someOtherFlow 
    .groupedWithin(Integer.MAX_VALUE, 5 seconds) 
    .runForeach(seq => 
    log.debug(s"received ${seq.length} messages in the last 5 seconds") 
) 

но он выводится только при наличии сообщений, без пустого списка при наличии 0 сообщений. Я тоже хочу 0. Это возможно?

ответ

2

Вы могли бы попробовать что-то вроде

src 
    .conflateWithSeed(_ ⇒ 1){ case (acc, _) ⇒ acc + 1 } 
    .zip(Source.tick(5.seconds, 5.seconds, NotUsed)) 
    .map(_._1) 

который должна партии своих элементов до тех пор, пока клещ отпускает их. Это вдохновляет от an example in the docs.

С другой стороны, если вам это нужно для целей мониторинга, вы можете использовать сторонний инструмент для этой цели - например, Kamon.

+0

Спасибо Стефано. Сложный код для такой простой задачи. У Kamon нет примеров онлайн (даже не блоги), где они измеряют скорость потоков akka и выводят ее. Я надеюсь, что есть что-то более простое. – Jasper

+1

Для примера измерения скорости с Kamon ознакомьтесь с этой демонстрацией https://github.com/svezfaz/akka-backpressure-scala-central-talk/tree/master/demo. Вы просто увеличиваете счетчик Kamon для каждого элемента (см. Flows.scala), который течет, а затем вы используете агрегатор скорости на панели управления (см. Папку Grafana в демо). –

+0

Я попробовал код сейчас, он (почти всегда) вернулся 0 за 5 секунд, он пропускает большинство сообщений. Кроме того, Kamon не является вариантом, потому что у меня нет панели управления, но просто хочу зарегистрировать ее. – Jasper

0

Расширение ответ Стефано немного я создал следующие потоки:

def flowRate[T](metric: T => Int = (_: T) => 1, outputDelay: FiniteDuration = 1 second): = 
    Flow[T] 
    .conflateWithSeed(metric(_)){ case (acc, x) ⇒ acc + metric(x) } 
    .zip(Source.tick(outputDelay, outputDelay, NotUsed)) 
    .map(_._1.toDouble/outputDelay.toUnit(SECONDS)) 

def printFlowRate[T](name: String, metric: T => Int = (_: T) => 1, 
        outputDelay: FiniteDuration = 1 second): Flow[T, T, NotUsed] = 
    Flow[T] 
    .alsoTo(flowRate[T](metric, outputDelay) 
       .to(Sink.foreach(r => log.info(s"Rate($name): $r")))) 

Первый преобразует поток в скорости в секунду. Вы можете предоставить metric, который дает значение каждому проходящему объекту. Скажем, вы хотите измерить скорость символов в потоке строк, затем можете пройти _.length. Второй параметр - это задержка между отчетами о скорости потока (по умолчанию - одна секунда).

Второй поток может использоваться inline для печати скорости потока для целей отладки без изменения значения, проходящего через поток. например,

, который будет показывать каждые 10 секунд среднюю скорость (в секунду) символов.