2013-12-04 7 views
0

У меня есть, например, эти партии кортежей с размером партии 5 с впечатлениями от пользователей:Как получить предыдущее состояние перед обновлением счетчика

Batch 1: 
[UUID1, clientId1] 
[UUID2, clientId1] 
[UUID2, clientId1] 
[UUID2, clientId1] 
[UUID3, clientId2] 

Batch 2: 
[UUID4, clientId1] 
[UUID5, clientId1] 
[UUID5, clientId1] 
[UUID6, clientId2] 
[UUID6, clientId2] 

И это мой пример сохранения количества состояния:

TridentState ClientState = impressionStream 
    .groupBy(new Fields("clientId")) 
    .persistentAggregate(getCassandraStateFactory("users", "DataComputation", 
     "UserImpressionCounter"), new Count(), new Fields("count)); 

Stream ClientStream = ClientState.newValuesStream(); 

У меня есть ясная база данных и выполняю мою топологию. После группирования потока по clientId я сохраняю состояние с функцией persistentAggregate и агрегатором Count. Для первой партии - результат после метода newValuesStream: [clientId1, 4], [clientId2, 1]. Для второй партии: [clientId1, 7], [clientId2, 3] как и ожидалось.

ClientStream используется в паре ветвей и в одном из этих ветвей мне нужно обрабатывать кортежи так, чтобы иметь партию с размером 1, потому что необходимо получить информацию о подсчете для каждого кортежа. Пакет с размером 1, очевидно, дерьмо, поэтому мне нужно как-то узнать предыдущее состояние счетчика, прежде чем обновлять его и испускать , эту информацию с кортежем уже обновил счетчик, например. для второй партии [clientId1, 7, 4].

Есть ли идеи, как это сделать?

ответ

0

Я решил эту проблему путем добавления новых агрегатор и присоединиться сохраняется агрегат:

TridentState ClientState = impressionStream 
    .groupBy(new Fields("clientId")) 
    .persistentAggregate(getCassandraStateFactory("users", "DataComputation", 
     "UserImpressionCounter"), new Count(), new Fields("count)); 

Stream ClientBatchAggregationStream = impressionStream 
    .groupBy(new Fields("clientId")) 
    .aggregate(new SumCountAggregator(), new Fields("batchCount")); 

Stream GroupingPeriodCounterStateStream = topology 
    .join(ClientState.newValuesStream(), new Fields("clientId"), 
     ClientBatchAggregationStream, new Fields("clientId"), 
     new Fields("clientId", "count", "batchCount")); 

SumCountAggregator:

public class SumCountAggregator extends BaseAggregator<SumCountAggregator.CountState> { 

    static class CountState { 
     long count = 0; 
    } 

    @Override 
    public CountState init(Object batchId, TridentCollector collector) { 
     return new CountState(); 
    } 

    @Override 
    public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector)   { 
     state.count += 1; 
    } 

    @Override 
    public void complete(CountState state, TridentCollector collector) { 
     collector.emit(new Values(state.count)); 
    } 

}