Я пытаюсь создать несколько тестов Unit, чтобы убедиться, что некоторые части моей топологии Trident делают то, что они предполагают.Как сохранить значения из Trident/Storm в списке (с использованием Java API)
Я хотел бы получить все значения, полученные после запуска топологии, и поместить их в список, чтобы я мог «видеть» и проверить условия на них.
FeederBatchSpout feederSpout = new FeederBatchSpout("some_time_field", "foo_id");
TridentTopology topology = new TridentTopology();
topology.newStream("spout1", feederSpout)
.groupBy(new Fields("some_time_field", "foo_id"))
.aggregate(new Fields("foo_id"), new FooAggregator(),
new Fields("aggregated_foos"))
// Soo... how do I retrieve the "aggregated_foos" from here?
Я бег топологии как TrackedTopology
(получил код от another S.O. question, спасибо @brianghig за просьбу и @Thomas Kielbus для ответа)
Это, как я «запуск» топология и как я кормить выборочные значения в нем:
TrackedTopology tracked = Testing.mkTrackedTopology(cluster, topology.build());
cluster.submitTopology("unit_tests", config, tracked.getTopology());
feederSpout.feed(new Values(MyUtils.makeSampleFoo(1));
feederSpout.feed(new Values(MyUtils.makeSampleFoo(2));
Когда я делаю это, я могу увидеть в логах, что топология правильно работает, и что значения вычисляются правильно, но я хотел бы «рыба» Результаты выйдите в List
(или любую структуру, на данный момент), поэтому я могу на самом деле поставить Asserts
в свои тесты.
Я пытаюсь использовать несколько подходов, но ни один из них не работает.
Последняя идея добавляла болт после агрегации, так что «сохраняются» бы моих ценностей в список:
Ниже вы увидите класс, который пытается пройти через все кортежи, испускаемые aggregate
и поставит их в список, который я ранее инициализирован:
class FieldFetcherStateUpdater extends BaseStateUpdater<FieldFetcherState> {
final List<AggregatedFoo> results;
public FieldFetcherStateUpdater(List<AggregatedFoo> results) {
this.results = results;
}
@Override
public void updateState(FieldFetcherState state, List<TridentTuple> tuples,
TridentCollector collector) {
for (TridentTuple tuple : tuples) {
results.add((AggregatedFoo) tuple.getValue(0));
}
}
}
Так что теперь код будет выглядеть следующим образом:
// ...
List<AggregatedFoo> results = new ArrayList();
topology.newStream("spout1", feederSpout)
.groupBy(new Fields("some_time_field", "foo_id"))
.aggregate(new Fields("foo_id"), new FooAggregator(),
new Fields("aggregated_foos"))
.partitionPersist(new FieldFetcherFactory(),
new Fields("aggregated_foos"),
new FieldFetcherStateUpdater(results));
LOGGER.info("Done. Checkpoint results={}", results);
Но ничего ... Журналы показывают Done. Checkpoint results=[]
(пустой список)
Есть ли способ получить это? Я предполагаю, что это должно быть выполнимо, но я не смог найти способ ...
Любые подсказки или ссылки на страницы или что-нибудь подобное будут оценены. Заранее спасибо.
Это работает! Я чувствую себя так грязно, делая это по какой-то причине ... но это работает! Ура! спасибо – BorrajaX