2017-02-07 5 views
6

Недавно я делаю эксперимент по проекту GIT для понимания большой структуры обработки данных.Celery & Rabbitmq: WARNING/MainProcess] Получено и удалено неизвестное сообщение. Неправильное назначение?!? - эксперимент на GIT

1, проект GIT: https://github.com/esperdyne/celery-message-processing

мы имеем следующие компоненты:

1, AMPQ брокер (RabbitMQ): он работает как буфер сообщений, который работает как почтовый ящик для обмена сообщения для разных пользователей!

2, рабочий: он работает как сервисный сервер для предоставления услуг для различных клиентов службы. 3, Очередь («сельдерей»: он работает как контейнер с несколькими обработки, который используется для обработки различных экземпляров рабочих в то же время

конфигурационный ключ можно увидеть, как показано ниже:.

мы используем объект PROJ/celery.py определить приложение, определение можно рассматривать, как показано ниже:

app = Celery('proj', 
     broker='amqp://', 
     backend='redis://localhost', 
     include=['proj.tasks']) 

введите код здесь

когда мы начинаем приложение:

1, когда мы запустили приложение, мы увидели сообщение, которое было создано из rabbitmq, но сельдерей не смог обработать сообщение.

Parse.log выглядит следующим образом: [2017-02-04 14: 28: 06,909: WARNING/MainProcess] Получено и удалено неизвестное сообщение. Неправильное место назначения?!?

мы имеем следующий вопрос:

4.2.1 механизм AMQP enter image description here Мы можем видеть, что AMQP работает как буфер сообщений, то будет отправитель сообщения и Fetcher сообщение:

На приведенной выше диаграмме, кто является отправителем сообщения и кто является сборщиком сообщений.

4.2.2 Определение сообщения В нашем приложении мы не можем найти код для определения сообщения для отправки или получения формы AMQP.

4.2.3 Монитор сообщений Как мы можем отслеживать отправку и получение сообщений в AMQP. Надеемся, что учитель поведет нас, чтобы решить эту проблему, и дайте нам несколько подробных

введение в мечетничество сельдерея!

Примечание: журнал ошибок можно увидеть здесь

[2017-02-04 14:28:06,909: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!? 



The full contents of the message body was: body: [[u'maildir/allen-  p/inbox/1.'], {}, {u'errbacks': None, u'callbacks': None, u'chord': None, u'chain': [{u'chord_size': None, u'task': u'celery.group', u'args': [], u'immutable': False, u'subtask_type': u'group', u'kwargs': {u'tasks': [{u'chord_size': None, u'task': u'proj.tasks.deploy_db', u'args': [], u'options': {u'reply_to': u'3d9de118-f9d0-3bee-9972-b6a4d4482446', u'task_id': u'3cafda16-3e7c-44db-b05e-1327ef97ffc3'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}, {u'chord_size': None, u'task': u'proj.tasks.deploy_es', u'args': [], u'options': {u'reply_to': u'3d9de118-f9d0-3bee-9972-b6a4d4482446', u'task_id': u'1f4c728b-680d-4dde-98b9-b153d5282780'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}]}, u'options': {u'parent_id': None, u'task_id': u'f21c911e-f2ac-462e-9662-2efbd27bcf91', u'root_id': None}}]}] (801b) 
{content_type:'application/json' content_encoding:'utf-8' 
    delivery_info:{'consumer_tag': 'None4', 'redelivered': False, 'routing_key': 'parse', 'delivery_tag': 623422L, 'exchange': ''} headers={'\xe5\xca.\xdb\x00\x00\x00\x00\x00': None, 'P&5\x07\x00': None, 'T\nKB\x00\x00\x00': 'fc8f0bed-665f-4699-89dd-a56fc247ea8b', 'N\xfd\x17=\x00\x00': '[email protected]', '\xcfb\xddR': 'py', '9*\xa8': None, '\xb7/b\x84\x00\x00\x00': 0, '\xe0\x0b\xfa\x89\x00\x00\x00': None, '\xdfR\xc4x\x00\x00\x00\x00\x00': [None, None], 'T3\x1d ': 'proj.tasks.parse', '\xae\xbf': 'fc8f0bed-665f-4699-89dd-a56fc247ea8b', '\x11s\x1f\xd8\x00\x00\x00\x00': "('maildir/allen-p/inbox/1.',)", 'UL\xa1\xfc\x00\x00\x00\x00\x00\x00': '{}'}} 


[2017-02-04 15:47:22,463: INFO/MainProcess] Connected to amqp://guest:**@localhost:5672// 
[2017-02-04 15:47:22,473: INFO/MainProcess] mingle: searching for neighbors 
[2017-02-04 15:47:23,503: INFO/MainProcess] mingle: sync with 2 nodes 
[2017-02-04 15:47:23,504: INFO/MainProcess] mingle: sync complete 
[2017-02-04 15:47:23,530: INFO/MainProcess] [email protected] ready. 
[2017-02-04 15:47:24,890: INFO/MainProcess] sync with [email protected] 
[2017-02-04 15:47:51,017: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!? 

The full contents of the message body was: body: [[u'maildir/allen-p/inbox/1.'], {}, {u'errbacks': None, u'callbacks': None, u'chord': None, u'chain': [{u'chord_size': None, u'task': u'celery.group', u'args': [], u'immutable': False, u'subtask_type': u'group', u'kwargs': {u'tasks': [{u'chord_size': None, u'task': u'proj.tasks.deploy_db', u'args': [], u'options': {u'reply_to': u'bd66dd5c-516d-3b51-ab40-c8337a33b18e', u'task_id': u'765e5bbe-198f-405c-b10c-023d35e03981'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}, {u'chord_size': None, u'task': u'proj.tasks.deploy_es', u'args': [], u'options': {u'reply_to': u'bd66dd5c-516d-3b51-ab40-c8337a33b18e', u'task_id': u'7dacb897-d023-40b5-9874-e00b75107bbd'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}]}, u'options': {u'parent_id': None, u'task_id': u'f0d41289-33e2-4c8c-8d84-9d1d4c5a9c80', u'root_id': None}}]}] (801b) 
{content_type:'application/json' content_encoding:'utf-8' 
    delivery_info:{'consumer_tag': 'None4', 'redelivered': False, 'routing_key': 'parse', 'delivery_tag': 3L, 'exchange': ''} headers={'\xe5\xca.\xdb\x00\x00\x00\x00\x00': None, 'P&5\x07\x00': None, 'T\nKB\x00\x00\x00': '4d7754ed-0e36-4731-ae99-a84f42b8eba1', 'N\xfd\x17=\x00\x00': '[email protected]', '\xcfb\xddR': 'py', '9*\xa8': None, '\xb7/b\x84\x00\x00\x00': 0, '\xe0\x0b\xfa\x89\x00\x00\x00': None, '\xdfR\xc4x\x00\x00\x00\x00\x00': [None, None], 'T3\x1d ': 'proj.tasks.parse', '\xae\xbf': '4d7754ed-0e36-4731-ae99-a84f42b8eba1', '\x11s\x1f\xd8\x00\x00\x00\x00': "('maildir/allen-p/inbox/1.',)", 'UL\xa1\xfc\x00\x00\x00\x00\x00\x00': '{}'}} 

enter code here 

ответ

10

Было бы полезно дать версии сельдерея и librabbitmq вы используете. Поскольку у меня была очень похожая проблема, я предполагаю, что вы используете celery 4.0.2 и librabbitmq 1.6.1.

В таком случае, это известная проблема совместимости, вы можете обратиться к https://github.com/celery/celery/issues/3675 и https://github.com/celery/librabbitmq/issues/93.

Первая ссылка дает вам рекомендацию, чтобы решить вашу проблему, а именно:

  • деинсталлировать librabbitmq pip uninstall librabbitmq (вы, возможно, придется назвать эту команду много раз)

  • изменения вхождения amqp в pyamqp в ваших письмах borker. (Хотя не в вашем файле конфигурации, если вы используете его. Это не помогло мне).

Чтобы более точно ответить на ваши другие вопросы: вы правы, говоря, что есть отправитель и приманка.

Роль отправителя принимается приложением, созданным при вызове Celery(...). Одна из его роли - действовать как реестр задач, и если вы посмотрите на ее реализацию в app/base.py, вы увидите, что она реализует метод send_task, который непосредственно вызывается методом apply_async класса Task. Роль этого метода заключается в том, чтобы отправить сортированную версию вашей задачи через провод до брокера, чтобы ее можно было получить рабочим. Протокол приложения, используемый для передачи сообщения, - amqp, для которого реализация - librabbitmq.

С другой стороны провода есть другой экземпляр, запущенный работником, который выполняет работу по извлечению. На языке сельдерея он называется Consumer. Вы можете найти его реализацию в employee/consumer/consumer.py. Вы увидите, что он реализует create_task_handler, который в свою очередь определяет функции on_task_received, которые вызывают ошибку, которую вы видите. Это функция, вызываемая при получении новой задачи от рабочего и следующего в строке обработкой.

Предлагаемое решение состоит в изменении реализации протокола amqp, так что TypeError не поднимается в on_task_received (что, по-моему, вызвано проблемой кодирования).

Надеюсь, он ответит на все ваши вопросы и даст вам более четкое представление о том, как работает сельдерей. Я должен закончить, сказав, что, насколько мне известно, «обычное» использование сельдерея никогда не потребует от вас вмешательства в такие внутренние функции и что вы можете достичь 99% того, что вам может понадобиться, например, путем создания пользовательских классов задач и пользовательских бэкэнд ,

+0

Привет Anis: это действительно так НИК e из вас, чтобы помочь мне в этом вопросе! Я должен назвать тебя мистером Фантастическим! 1) pip2.7 установить librabbitmq-1.6.1.tar.gz 2) pip2.7 установить celery-4.0.2.tar.gz. Это точно версия программного обеспечения, которую я установил! Я последовал твоему совету! И теперь мой проект теперь работает! Я так счастлив сегодня вечером! Что хороший друг Анис поможет мне в этом вопросе! – arthur

0

Просто, чтобы ответить и здесь. В потоке Анис относится к 23doors mentions, что новый протокол сельдерея 4 в по умолчанию не играет хорошо с librabbitmq:

Видимо librabbitmq проблема связана с новым протоколом по умолчанию в сельдерее 4.x.

Он также упоминает, что для решения этой проблемы вы можете использовать старые предложения протокола Сельдерея установки (если вы используете Django):

CELERY_TASK_PROTOCOL = 1 

В противном случае вы можете установить следующее в вашем celeryconf.py файл

app.conf.task_protocol = 1 

Все кредит 23doors :)