2015-06-18 6 views
13

Я использую RabbitMQ в C# с библиотекой EasyNetQ. Здесь я использую шаблон pub/sub. У меня все еще есть несколько вопросов, на которые я надеюсь, что кто-нибудь может мне помочь:Как сделать обработку ошибок с EasyNetQ/RabbitMQ

  1. При возникновении ошибки при использовании сообщения оно автоматически перемещается в очередь ошибок. Как я могу выполнить повторные попытки (чтобы он был помещен обратно в исходную очередь, и когда он не обрабатывал X раз, он перемещается в очередь с мертвой буквой)?
  2. Насколько я вижу, всегда существует 1 очередь ошибок, которая используется для вывода сообщений из всех остальных очередей. Как я могу иметь 1 очередь ошибок для каждого типа, так что каждая очередь имеет свою собственную связанную очередь ошибок?
  3. Как я могу легко повторить сообщения, находящиеся в очереди ошибок? Я пробовал Hosepipe, но он просто переиздает сообщения в очередь ошибок вместо исходной очереди. Мне не нравится этот вариант, потому что я не хочу играть в консоли. Предпочтительно, я просто программировал бы против очереди ошибок.

Кто-нибудь?

ответ

10

Проблема, с которой вы сталкиваетесь с EasyNetQ/RabbitMQ, заключается в том, что она намного более «сырая» по сравнению с другими службами обмена сообщениями, такими как SQS или Azure Service Bus/Queues, но я сделаю все возможное, чтобы указать вам на правильное направление.

Вопрос 1.

Это будет на вас сделать. Самый простой способ - вы не можете отправить сообщение в RabbitMQ/EasyNetQ, и оно будет помещено во главе очереди для повторной попытки. Это не рекомендуется, потому что он будет почти сразу же повторен (без временной задержки), а также будет блокировать обработку других сообщений (если у вас есть один абонент с номером предварительной выборки 1).

Я видел другие реализации использования «MessageEnvelope». Таким образом, класс-оболочка, который, когда сообщение терпит неудачу, увеличивает значение переменной повтора в MessageEnvelope и возвращает сообщение обратно в очередь. Вы должны сделать это и написать код обертывания вокруг ваших обработчиков сообщений, это не будет функцией EasyNetQ.

Используя вышеизложенное, я также видел, как люди используют конверты, но допускают, чтобы сообщение было мертвым. Когда он находится в очереди с мертвой буквой, в очереди с мертвой буквой появляется другое приложение/работник, читающий элементы.

Все эти подходы выше имеют небольшую проблему, так как на самом деле нет хорошего способа иметь логарифмическую/экспоненциальную/любую возрастающую задержку при обработке сообщения. Вы можете «удерживать» сообщение в коде некоторое время, прежде чем возвращать его в очередь, но это не очень хорошо.

Из всех этих вариантов возможно, что ваше собственное приложение, просматривающее очередь с мертвой буквой, и решение о перенаправлении сообщения на основе конверта, содержащего счетчик повторных попыток, является, пожалуй, лучшим способом.

Вопрос 2.

Вы можете указать мертвый обмен письма по очередям с помощью расширенного API. (https://github.com/EasyNetQ/EasyNetQ/wiki/The-Advanced-API#declaring-queues). Однако это означает, что вам придется использовать расширенный API почти везде, так как использование простой реализации IBus для подписки/публикации ищет очереди, имена которых основаны как на типе сообщения, так и на имени абонента.Использование пользовательского объявления очереди означает, что вы будете обрабатывать наименования своих очередей самостоятельно, а это значит, что когда вы подписываетесь, вам нужно знать имя того, что вы хотите и т. Д. Вам больше не нужно подписываться на авто!

Вопрос 3

сообщение об ошибке Очередь/Dead Letter Queue это просто еще одна очередь. Вы можете слушать эту очередь и делать то, что вам нужно сделать. Но на самом деле не существует какого-либо из готового решения, которое звучит так, как будто оно соответствует вашим потребностям.

+0

Мы обнаружили, что практической пользы для стандартной реализации EasyNetQ не было за пределами одной быстрой демонстрации, привязанной к некоторым общим классам .NET, для пользователей, которые впервые используют их. После этого переключитесь на простое «продвинутое» приложение Easy «API, да, вы можете делать продвинутые вещи, но, честно говоря, это очень простой API для использования. Определенно, поклонник Easy's Advanced API для любой работы. –

5

Я реализовал именно то, что вы описали. Вот несколько советов, основанных на моем опыте и связанных с каждым из ваших вопросов.

Q1 (как раз повторить X):

Для этого вы можете использовать IMessage.Body.BasicProperties.Headers. Когда вы отправляете сообщение из очереди ошибок, просто добавьте заголовок с выбранным именем. Ищите этот заголовок для каждого сообщения, входящего в очередь ошибок, и увеличивайте его. Это даст вам количество повторных попыток.

Очень важно, что у вас есть стратегия для того, что делать, когда сообщение превышает ограничение на повторение X. Вы не хотите терять это сообщение. В моем случае я пишу сообщение на диск в этот момент. Это дает вам много полезной информации для отладки, чтобы вернуться к ней позже, потому что EasyNetQ автоматически обертывает ваше исходное сообщение информацией об ошибках. У него также есть исходное сообщение, чтобы вы могли, если хотите, вручную (или, может быть, автоматизировать, с помощью какого-либо кода повторной обработки партии) запрашивать сообщение позже каким-либо контролируемым образом.

Вы можете посмотреть код в утилите Hosepipe, чтобы увидеть хороший способ сделать это. Фактически, если вы следуете шаблону, который видите там, вы можете даже использовать Hosepipe позже, чтобы запросить сообщения, если вам нужно.

Q2 (как создать очереди ошибок по очередям инициирующей):

Вы можете использовать EasyNetQ Advanced Bus, чтобы сделать это чисто. Используйте IBus.Advanced.Container.Resolve<IConventions>, чтобы получить интерфейс условностей. Затем вы можете установить соглашения для именования очереди ошибок с conventions.ErrorExchangeNamingConvention и conventions.ErrorQueueNamingConvention. В моем случае я устанавливаю соглашение на основе имени исходной очереди, чтобы каждый раз, когда я создавал очередь, я получал очередь очередей/queue_error.

Q3 (как обрабатывать сообщения в очереди ошибок):

Вы можете объявить потребитель для ошибки в очереди так же, как вы делаете какую-либо другую очередь. Опять же, AdvancedBus позволяет вам сделать это чисто, указав, что тип, выходящий из очереди, - EasyNetQ.SystemMessage.Error. Итак, IAdvancedBus.Consume<EasyNetQ.SystemMessage.Error>() доставит вас туда. Повторная попытка просто означает повторную публикацию исходного обмена (обращая внимание на счет повтора, который вы помещаете в заголовок (см. Мой ответ на Q1, выше), а информация в сообщении об ошибке, которую вы уничтожили из очереди ошибок, может помочь вам найти цель для

+0

У вас есть рабочий пример? –

+0

У меня это работает, см. Пример здесь http://stackoverflow.com/questions/32077044/re-queue-message-on-exception –