2016-04-05 9 views
1

У меня есть некоторые проблемы с следующим маршрутом:Apache Camel EIP - Как остановить раскол()

// from("cxf:....")... 
from("direct:start").process(startRequestProcessor) // STEP 1 
      .choice() 
       .when(body().isNull()) 
         .to("direct:finish") 
       .otherwise() 
        .split(body()) // STEP 2 
        .bean(TypeMapper.class) // STEP 3 
        .log("Goes to DynamicRouter:: routeByTypeHeader with header: ${headers.type}") 
        .recipientList().method(Endpoint1DynamicRouter.class, "routeByTypeHeader") // STEP 4 
        .ignoreInvalidEndpoints(); 

    from("direct:endpoint2") // STEP 6 
      .log("Goes to DynamicRouter::routeByCollectionHeader with header: ${headers.collection}") 
      .recipientList().method(Endpoint2DynamicRouter.class, "routeByCollectionHeader") 
      .ignoreInvalidEndpoints(); 

    from("direct:endpoint1.1") // STEP 5 
      .process(new DateRangeProcessor()) 
      .to("direct:collections"); 

    from("direct:endpoint1.2") // STEP 5 
      .process(new SingleProcessor()) 
      .to("direct:collections"); 


    from("direct:endpoint2.2") // STEP 7 
      .aggregate(header("collection" /** endpoint2.2 */), CollectionAggregationStrategy) 
      .completionSize(exchangeProperty("endpoint22")) 

      .process(new QueryBuilderProcessor()) 
      .bean(MyService, "getDbCriteria") 

      .setHeader("collection", constant("endpoint2.1")) 
      .to("direct:endpoint2.1").end(); 


    from("direct:endpoint2.1") // STEP 8 
      .aggregate(header("collection" /** endpoint2.1 */), CollectionAggregationStrategy) 
      .completionSize(exchangeProperty("CamelSplitSize")) 
      .to("direct:finish").end(); 

    from("direct:finish") 
      .process(new QueryBuilderProcessor()) 
      .bean(MyRepository, "findAll") 
      .log("ResponseData: ${body}"). 
      marshal().json(JsonLibrary.Gson).end(); 

Маршрут

  1. получает JSON строки с преобразует его в список (HashSet) из JSONObjects.
  2. разделить полученный список на json-объекты.
  3. Установить соответствующие заголовки в соответствии с содержанием объекта
  4. Маршруты сообщения в соответствии с заголовками, чтобы endpoint1.1 или endpoint1.2
  5. Преобразование сообщений MongoDB Критерии и отправить на endpoint2
  6. Endpoint2 маршрутизацию сообщений в соответствии с другим заголовком к endpoint2.1 или endpoint2.2.
  7. Endpoint2.2 объединяет все принятые сообщения, обрабатывает его, чтобы получить mongodb Критерии и отправляет его в конечную точку2.1 (Complesize вычисляется на шаге 2 и сохраняется в свойстве «endpoint22»).
  8. Enpoint2.1 агрегирует ВСЕ сообщения (CamelSplitSize) преобразует агрегированные сообщения в объект Query и отправляет их в репозиторий для извлечения данных.

Я могу видеть правильный объект ответа в отладчике, но в любом случае я получаю сообщение об ошибке:

No message body writer has been found for class java.util.HashSet, ContentType: application/json

Проблема заключается не в объекте ответа, как он работает с другими маршрутами и не содержит HashSets.

Я думаю, что маршрут посылает вывода HashSet созданный тат ШАГ 1 ...

Мои вопросы:

  • что неправильно в выводе маршрута?
  • как recipientList() пытаются направить сообщения недействительных конечной точки (я должен использовать .ignoreInvalidEndpoints(), чтобы избежать исключения):

    org.apache.camel.NoSuchEndpointException: No endpoint could be found for: [email protected], please check your classpath contains the needed Camel component jar.

Любая помощь будет высоко ценится! Спасибо.

ответ

2

Мне очень странно, но функция .ggregate() не отвечает на обмен. Он использует стратегию агрегации, но всегда отвечает на входящий обмен. Это неясно при чтении документации, но вам нужно использовать стратегию агрегации вместе с split(), чтобы иметь возможность возвращать обмен.