У меня есть, например, эти партии кортежей с размером партии 5 с впечатлениями от пользователей:Как получить предыдущее состояние перед обновлением счетчика
Batch 1:
[UUID1, clientId1]
[UUID2, clientId1]
[UUID2, clientId1]
[UUID2, clientId1]
[UUID3, clientId2]
Batch 2:
[UUID4, clientId1]
[UUID5, clientId1]
[UUID5, clientId1]
[UUID6, clientId2]
[UUID6, clientId2]
И это мой пример сохранения количества состояния:
TridentState ClientState = impressionStream
.groupBy(new Fields("clientId"))
.persistentAggregate(getCassandraStateFactory("users", "DataComputation",
"UserImpressionCounter"), new Count(), new Fields("count));
Stream ClientStream = ClientState.newValuesStream();
У меня есть ясная база данных и выполняю мою топологию. После группирования потока по clientId я сохраняю состояние с функцией persistentAggregate и агрегатором Count. Для первой партии - результат после метода newValuesStream: [clientId1, 4]
, [clientId2, 1]
. Для второй партии: [clientId1, 7]
, [clientId2, 3]
как и ожидалось.
ClientStream используется в паре ветвей и в одном из этих ветвей мне нужно обрабатывать кортежи так, чтобы иметь партию с размером 1, потому что необходимо получить информацию о подсчете для каждого кортежа. Пакет с размером 1, очевидно, дерьмо, поэтому мне нужно как-то узнать предыдущее состояние счетчика, прежде чем обновлять его и испускать , эту информацию с кортежем уже обновил счетчик, например. для второй партии [clientId1, 7, 4]
.
Есть ли идеи, как это сделать?