2016-11-11 3 views
0

Я использую Apache Camel и получаю большой файл для ввода, который я должен обрабатывать по строкам. Содержимое уже отсортировано, и мне нужно собрать все последовательные строки с одним и тем же ключом корреляции. Если изменяется корреляционный ключ, предыдущий агрегат должен быть завершен. Если файл завершится, последний агрегат также будет завершен. У меня есть некоторые ограничения: - Поскольку входящий файл довольно большой, мы хотим его обработать потоковым способом. - Поскольку результат предоставляется синхронной конечной точке, я не хочу использовать предикат завершения таймаута. В противном случае я потерял бы противодавление, регулирующее скорость потребления источника данных, и обмены будут накапливаться в тайм-аут-карте и агрегированном хранилище AggregateProcessor.Совокупность только последовательных обменов с одним и тем же ключом корреляции

PreCompletionAwareAggregationStrategy выглядит многообещающим решением, но оказалось, что последний агрегат не будет завершен до появления следующего файла. Если я использую свойство CamelSplitComplete в preComplete, последний агрегат будет завершен, но без последнего входящего обмена. Вместо этого последний обмен будет добавлен в содержимое следующего файла.

Так что в настоящее время я совершенно потерял решение, которое не является излишне уродливым.

+0

Я регистрируемый билет, чтобы увидеть, если мы может получить что-то OOTB, чтобы сделать это проще: https://issues.apache.org/jira/browse/CAMEL-10474 –

+0

Теперь вы можете сделать это через CAMEL-10474. Но я записал еще один билет, чтобы сделать это еще проще: https://issues.apache.org/jira/browse/CAMEL-12296 –

ответ

0

В описанном сценарии, я послал бы расщепляется сообщения на маршруте с агрегатором (давайте назовем его «AggregationRoute»), который его стратегия агрегации реализует PreCompletionAwareAggregationStrategy (как вы уже используете его, Я полагаю). Затем, когда сплит заканчивается, установите для заголовка AGGREGATION_COMPLETE_ALL_GROUPS значение true и отправьте его в AggregationRoute. Этот обмен будет использоваться только как сигнал для заполнения всех групп агрегации.

Пример:


    ... 
    .split(body()).streaming() 
     .to("direct:aggregationRoute") 
    .end() 
    .setHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS,constant(true)) 
    .to("direct:aggregationRoute"); 

from("direct:aggregationRoute") 
    .aggregate([your correlation expression]), myAggregationStrategy) 
    ... 

Другой альтернативой является использование AggregateController для завершения агрегации всех групп путем вызова метода forceCompletionOfAllGroups():


AggregateController aggregateController = new DefaultAggregateController(); 

from(...) 
    ... 
    .split(body()).streaming() 
     .aggregate([correlation expression], aggregationStrategy).aggregateController(aggregateController) 
      ... 
      // Do what you need to do with the aggregated exchange 
      ... 
     .end() 
    .end() 
    .bean(aggregateController, "forceCompletionOfAllGroups") 
0

Ну, может быть, один подход может быть, так как ваши данные уже отсортированы - это синтаксический анализ в потоковом режиме и добавление каждой строки с одним и тем же ключом корреляции к некоторой структуре hashmap. Когда встречается новый корреляционный ключ, вы, по сути, хотите «сбросить» хэш-карту, чтобы создать новое сообщение, а затем перезапустить тот же процесс. Посмотрите здесь: http://camel.apache.org/how-do-i-write-a-custom-processor-which-sends-multiple-messages.html