2016-07-29 2 views
2

Я пытаюсь понять основы агрегатора. Ниже приведен пример использования, который я пытаюсь реализовать:Не удалось заставить агрегатор работать

1) Прочитайте сообщение (детали заказа) из очереди.

<?xml version="1.0" encoding="UTF-8"?> 
<order xmlns="http://www.example.org/orders"> 
    <orderItem> 
    <isbn>12333454443</isbn> 
    <quantity>4</quantity> 
    </orderItem> 
    <orderItem> 
    <isbn>545656777</isbn> 
    <quantity>50</quantity> 
    </orderItem> 
.. 
.. 
</order> 

Один заказ сообщение будет содержать несколько OrderItem. И мы можем ожидать, что сотни заказывают сообщений в очереди.

2) Конечный результат ::

а) Каждый OrderItem должны быть записаны в файл.

b) такие файлы должны быть записаны в уникальную папку.

Для примера, предположим, что мы получили два порядка сообщений - каждый из которых содержит три OrderItem.

Так что нам нужно создать 2 папки:

In "папке 1", там должно быть 4 файла (1 OrderItem в каждом файле)

В "папке 2", там должно быть 2 файлы (1 orderitem в каждом файле). Здесь для простоты мы принимаем не более заказ Пришло сообщение, и мы можем написать через 5 минут.

Реализация:


  1. Я могу прочитать сообщение из очереди (WebSphere MQ) и маршализацию сообщение успешно.
  2. Используется сплиттер для разделения сообщения на основе orderitem count.
  3. Б агрегатор группировать сообщение размером 4.

я смог агрегатор работать в моем понимании.

  1. я нажимаю один заказ когда 4 OrderItem, сообщение становится агрегированной правильно.
  2. Я нажимаю один заказ с 5 orderitem, первые 4 объединяются, но последний отправляется для отказа от канала. Это ожидается, когда MessageGroup будет выпущена, поэтому последнее сообщение будет отброшено.
  3. Я нажимаю два заказы каждый содержит 2 orderitem. Последние 2 orderitem отправляются на канал сброса.
    Стратегия корреляции жестко запрограммирована (OrderAggregator.java), но вышеприведенный случай должен сработать.

Нужно указать, как реализовать этот прецедент, где я могу сгруппировать их в 4 и записать в уникальные папки. Обратите внимание, что orderitem - это все независимые книжные заказы и не имеют между ними отношения.

Ниже приведена конфигурация.

весна-bean.xml

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans"> 
    <int:channel id="mqInbound"/> 
    <int:channel id="item"/> 
    <int:channel id="itemList"/> 
    <int:channel id="aggregatorDiscardChannel"/> 

    <int-jms:message-driven-channel-adapter id="jmsIn" 
             channel="mqInbound" 
             destination="requestQueue" 
             message- converter="orderMessageConverter"/> 

    <int:splitter input-channel="mqInbound" output-channel="item" expression="payload.orderItem"/> 

    <int:chain id="aggregateList" input-channel="item" output-channel="itemList" > 
    <int:header-enricher> 
     <int:header name="sequenceSize" expression="4" overwrite="true"/> 
    </int:header-enricher> 
    <int:aggregator correlation-strategy="orderAggregator" correlation-strategy-method="groupOrders" discard-channel="aggregatorDiscardChannel" /> 
    </int:chain> 

    <int:service-activator input-channel="itemList"     ref="displayAggregatedList" method="display"/> 
    <int:service-activator input-channel="aggregatorDiscardChannel" ref="displayAggregatedList" method="displayDiscarded"/> 

    <bean id="orderAggregator"  class="com.samples.Aggregator.OrderAggregator"/> 
    <bean id="displayAggregatedList" class="com.samples.Aggregator.DisplayAggregatedList"/> 
    ... 
    .... 
</beans> 

OrderAggregator.java

public class OrderAggregator { 

@Aggregator 
public List<OrderItemType> sendList(List<OrderItemType> orderItemTypeList) { 

    return orderItemTypeList; 
} 

@CorrelationStrategy 
public String groupOrders(OrderItemType orderItemType) { 

    return "items"; 
} 

} 

DisplayAggregatedList.java

public class DisplayAggregatedList { 

public void display(List <OrderItemType> orderItemTypeList) { 

    System.out.println("######## Display Aggregated ##############"); 
    for(OrderItemType oit : orderItemTypeList) { 
     System.out.println("### Isbn :" + oit.getIsbn() + ":: Quantity :" + oit.getQuantity()); 
    } 
} 
public void displayDiscarded(Message<?> message) { 

    System.out.println("######## Display Discarded ##############" + message); 
} 
} 

ответ

1

Что вам нужно, это называется expire-groups-upon-completion:

Когда установлено значение true (по умолчанию false), завершенные группы удаляются из хранилища сообщений, позволяя последующим сообщениям с той же корреляцией формировать новую группу. Поведение по умолчанию - отправлять сообщения с той же корреляцией, что и заполненная группа, в канал сброса.

Если вам нужно в любом случае, чтобы освободить незавершенные группы (2 заказов слева, например), рассмотреть возможность использования group-timeout: http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to

+0

Спасибо @Artem Bilan за ваш ответ. Кажется, что агрегатор работает непоследовательно для моего кода. В зависимости от количества сообщений, которые я нажимаю, иногда это работает. Однако следующая последовательность сообщений ** не работает **. ** 1) ** Сообщение с сообщением заказа с 3 orderitem. (isbn #: 001, 002, 003) ** 2) ** Сообщение о заказе с 1 заказным номером. (isbn #: 004) Сообщения должны быть освобождены после второго сообщения. Однако второе сообщение (Isbn: 004) отображается в канале ** сброса **. –

+0

Ниже scenerio works: ** 1) ** Сообщение с сообщением заказа с 5 orderitem. (isbn #: 011, 012, 013, 014, 015) ** 2) ** Сообщение о заказе заказа с 5 orderitem. (isbn #: 016, 017, 018, 019, 020) ** 3) ** Сообщение о заказе заказа с 2-мя заказами. (isbn #: 021, 022) ** Выпущены три группы сообщений **. группа № 1 (011,012,013,014), группа № 2 (015,016,017,018), группа № 3 (019 020 021 022). Однако по какой-то причине более ранняя последовательность (3 orderitem + 1 orderitem), как разделенная в моем последнем комментарии, не работает –

+0

Пожалуйста, используйте 'expire-groups-upon-completion =" true "и подумайте о том, чтобы использовать' MessageCountReleaseStrategy' для 'release -strategy' –

0

Пожалуйста, используйте истекают-группы, при завершенности = «истина» и считают используйте MessageCountReleaseStrategy` для стратегии выпуска - Artem Bilan

+0

, вы должны просто принять мой ответ вместо этого –