1

Я пытаюсь внедрить актерскую систему с возможностью обратного давления. В качестве требования мастер-процесс получает потоковые данные в формате JSON. Однако каждое событие JSON имеет несколько полей, таких как {ip: '123.43.12.1', страна: 'US', ... и т. Д.}. Структура JSON известна заранее.Как создать реактивную систему актерского потока с возможностью вентиляции

Теперь я должен как-то сгладить структуру JSON в (ключ, значение). Например, вышеупомянутые данные могут быть сплющены в (ip, freq), (страна, частота), где частота - это время, в течение которого в потоке данных появляется ip (например, «123.43.12.1»).

Очень естественным способом является пересылка каждой пары (ключ, значение) соответствующему ребенку/удаленному актеру для дальнейшей оценки. Например, ('123.43.12.1', 1) отправляется на IP-Actor; («США», 1) отправляется в Country-Actor и так далее.

Я хочу убедиться, что вся система находится под давлением. В этом случае дело сложнее, потому что событие {ip: '123.43.12.1', country: 'US'} рассматривается только как обработанное, если оба IP-Actor и Country-Actor завершили обработку сплющенной пары ('123.43. 12.1 ', 1), (' US ', 1). Каждый актер может иметь разную скорость обработки (например, IP-Actor намного быстрее, чем Country-Actor). В этом случае я хочу, чтобы главный процесс, который получил поток, будет ждать/блокироваться до тех пор, пока не появится сигнал потребности (произойдет, когда оба участника закончат обработку существующих данных в своем почтовом ящике). В противном случае, какой-то актер может заполнить сообщение в почтовом ящике (Country-Actor - slow one), но сообщение все еще приходит, потому что другой почтовый ящик актера пуст (IP-Actor - более быстрый).

Может ли кто-нибудь предложить, если характеристики реактивного потока обеспечивают такую ​​функциональность. Если нет, то в любом случае для достижения функциональности наиболее эффективным способом.

Спасибо.

ответ

0

Тип синхронизации между действующими лицами, которые вы описали, именно то, чего вы хотите избежать в модели Actor. Любой «wait/block» является противоположным для реактивного программирования. Я предлагаю использовать один поток Flow для обновления.

Сначала необходимо обработать данные JSon:

import akka.stream.scaladsl._ 

//your original source of json strings 
val jsonSrc : Source[String, NotUsed] = ??? 

case class JsonObject(ip : String, country : String) 

//use your favorite json parser 
def jsonParser(jsonStr : String) : JsonObject = ??? 

val parserFlow = Flow[String] map jsonParser 

Следующих определить счетчик логику и использовать Flow.scan для создания счетчика с увеличивающимися значениями:

type IPCounter = Map[String,Int] 
val emptyIPCounter = Map.empty[String,Int] withDefaultValue 0 

type CountryCounter = Map[String, Int] 
val emptyCountryCounter = Map.empty[String,Int] withDefaultValue 0 

type Counters = Tuple2[IPCounter, CountryCounter] 
val emptyCounters = (emptyIPCounter, emptyCountryCounter) 

def updateCounters(counters : Counters, jsonObj : JsonObject) : Counters = { 
    (counters._1.updated(jsonObj.ip, counters._1(jsonObj.ip) + 1), 
    counters._2.updated(jsonObj.country, counters._2(jsonObj.country) + 1)) 
} 

val counterFlow = Flow[JsonObject].scan(emptyCounters)(updateCounters) 

Наконец, объединить все вместе:

val counterSource : Source[Counters, NotUsed] = jsonSrc via parserFlow via counterFlow 

Результат - это именно то, что вы просили : поток с прореживанием, который только пересылает значения счетчика, когда все счетчики были обновлены.

 Смежные вопросы

  • Нет связанных вопросов^_^