2016-01-20 2 views
0

Я пытаюсь создать несколько тестов Unit, чтобы убедиться, что некоторые части моей топологии Trident делают то, что они предполагают.Как сохранить значения из Trident/Storm в списке (с использованием Java API)

Я хотел бы получить все значения, полученные после запуска топологии, и поместить их в список, чтобы я мог «видеть» и проверить условия на них.

FeederBatchSpout feederSpout = new FeederBatchSpout("some_time_field", "foo_id"); 
    TridentTopology topology = new TridentTopology(); 
    topology.newStream("spout1", feederSpout) 
    .groupBy(new Fields("some_time_field", "foo_id")) 
    .aggregate(new Fields("foo_id"), new FooAggregator(), 
       new Fields("aggregated_foos")) 
    // Soo... how do I retrieve the "aggregated_foos" from here? 

Я бег топологии как TrackedTopology (получил код от another S.O. question, спасибо @brianghig за просьбу и @Thomas Kielbus для ответа)

Это, как я «запуск» топология и как я кормить выборочные значения в нем:

TrackedTopology tracked = Testing.mkTrackedTopology(cluster, topology.build()); 
cluster.submitTopology("unit_tests", config, tracked.getTopology()); 

feederSpout.feed(new Values(MyUtils.makeSampleFoo(1)); 
feederSpout.feed(new Values(MyUtils.makeSampleFoo(2)); 

Когда я делаю это, я могу увидеть в логах, что топология правильно работает, и что значения вычисляются правильно, но я хотел бы «рыба» Результаты выйдите в List (или любую структуру, на данный момент), поэтому я могу на самом деле поставить Asserts в свои тесты.

Я пытаюсь использовать несколько подходов, но ни один из них не работает.

Последняя идея добавляла болт после агрегации, так что «сохраняются» бы моих ценностей в список:

Ниже вы увидите класс, который пытается пройти через все кортежи, испускаемые aggregate и поставит их в список, который я ранее инициализирован:

class FieldFetcherStateUpdater extends BaseStateUpdater<FieldFetcherState> { 
    final List<AggregatedFoo> results; 

    public FieldFetcherStateUpdater(List<AggregatedFoo> results) { 
     this.results = results; 
    } 

    @Override 
    public void updateState(FieldFetcherState state, List<TridentTuple> tuples, 
          TridentCollector collector) { 
     for (TridentTuple tuple : tuples) { 
      results.add((AggregatedFoo) tuple.getValue(0)); 
     } 
    } 
} 

Так что теперь код будет выглядеть следующим образом:

// ... 
List<AggregatedFoo> results = new ArrayList(); 
topology.newStream("spout1", feederSpout) 
    .groupBy(new Fields("some_time_field", "foo_id")) 
    .aggregate(new Fields("foo_id"), new FooAggregator(), 
       new Fields("aggregated_foos")) 
    .partitionPersist(new FieldFetcherFactory(), 
         new Fields("aggregated_foos"), 
         new FieldFetcherStateUpdater(results)); 

    LOGGER.info("Done. Checkpoint results={}", results); 

Но ничего ... Журналы показывают Done. Checkpoint results=[] (пустой список)

Есть ли способ получить это? Я предполагаю, что это должно быть выполнимо, но я не смог найти способ ...

Любые подсказки или ссылки на страницы или что-нибудь подобное будут оценены. Заранее спасибо.

ответ

0

Необходимо использовать переменную статического элемента result. Если у вас запущено несколько параллельных задач (например, parallelism_hint > 1), вам также необходимо указать synchronize доступ для записи к result.

В вашем случае result будет пустым, потому что Storm внутренне создает новый экземпляр вашего болта (включая новый экземпляр ArrayList). Использование статической переменной гарантирует, что вы получите доступ к правильному объекту (так как будет только один из всех экземпляров вашего болта).

+0

Это работает! Я чувствую себя так грязно, делая это по какой-то причине ... но это работает! Ура! спасибо – BorrajaX

 Смежные вопросы

  • Нет связанных вопросов^_^