2017-02-13 15 views
1

Я застрял в относительно сложной конфигурации цепи сельдерея, пытаясь добиться следующего. Предположим, что есть цепочка задач, таких как следующие:Использование результата группы в цепочке сельдерея

chain1 = chain(
    DownloadFile.s("http://someserver/file.gz"), # downloads file, returns temp file name 
    UnpackFile.s(), # unpacks the gzip comp'd file, returns temp file name 
    ParseFile.s(), # parses file, returns list URLs to download 
) 

Теперь я хочу, чтобы загрузить каждый URL параллельно, так что я сделал:

urls = chain1.get() 
download_tasks = map(lambda x: DownloadFile.s(x), urls) 
res1 = celery.group(download_tasks)() 
res1_data = res1.get() 

Наконец, я хочу взять каждый загруженный файл (имя временного файла возвращается из DownloadFile) вернулся из ParseFile и запустить его через другую цепочку задач, параллельно (например, это будет group из chain s):

chains = [] 
for tmpfile in res: 
    chains.append(celery.chain(
     foo.s(tmpfile), 
     bar.s(), 
     baz.s() 
    )) 

res2 = celery.group(*chains)() 
res2_data = res2.get() 

Подход работает отлично, если я запускаю его в обычном Python-процессе (не в другой задаче celery), потому что я могу ждать результатов от chain1, а затем построить группу задач загрузки и новые цепочки для каждого загруженного файла.

Однако, теперь я хочу, чтобы обернуть все эти вещи в другую задачу сельдерея, обернув его в украшенной функции другой @app.task, и оказывается, что вы не можете позвонить (или на самом деле не должны вызывать .get() изнутри задача дождаться завершения еще одной задачи), и мне не удалось найти решение «портировать» этот рабочий процесс для выполнения внутри задачи. Я попытался добавить res1 в цепочку chain1, но сельдерей жалуется, что <GroupResult: ..... > is not JSON serializable.

Может ли кто-нибудь предложить способ заставить его работать? Благодаря!

+0

вы заинтересованы результатами вашей обработки foo_bar_baz? Я имею в виду, достаточно ли знать, что обработка была выполнена (например, вы очищаете файл и сохраняете его где-то) – arthur

ответ

1

Действительно, вам сложно позвонить .get() внутри задачи. Цель Celery заключается в параллельном выполнении асинхронных задач, поэтому вы не должны ждать результатов.

Один из способов решения вашей проблемы - сохранить результат URL-адреса вашей первой обработки (либо в файлах, либо в базе данных).

Я написал короткий пример того, что вы можете сделать, написав результаты в файлах. Я выбрал json демпинг.

Предположим, у вас есть список urls в вашем main. Сначала вы запускаете асинхронную обработку всех этих URL-адресов с groupchain. Все эти задачи будут обрабатывать URL-адреса и сохранять список URL-адресов для загрузки в файлах, расположенных в указанном каталоге tmp.

Затем вы также запускаете задачу check_dir, которая будет проверять каталог, если файлы были написаны, и в этом случае обрабатывать каждый файл и удалять соответствующий файл в каталоге tmp.

С параметрами, которые я выбрал, эти задачи автоматически запускаются каждые 30 секунд и никогда не заканчиваются (я полагал, что у вас была повторяющаяся работа для выполнения), поэтому вы можете изменить это, но это должно было дать вам представление о том, как вы могли бы справиться.

Я запустил его как main, но может также поместить его в другую задачу по сельдерию, если вы хотите.

app_module.py

from __future__ import absolute_import 
from celery import Celery 

app = Celery('app') 
app.config_from_object("settings") 

if __name__ == '__main__': 
    app.start() 

задачи.ру

from celery import group, chain 
from app_module import app 

import json 
import glob 
import os 

__all__ = ('download_file', 
       'unpack_file', 
       'parse_file', 
       'foo', 
       'bar', 
       'process_downloaded_file', 
       'check_dir',) 

path = "./data/tmp_dir/" 

@app.task 
def download_file(filename): 
    return filename 

@app.task 
def unpack_file(filename): 
    return "unzipped_" + filename 

@app.task 
def parse_file(filename): 
    # Fake parse task storing results in a temp directory 
    # results are stored in a json and contains the list of urls 
    with open(path + filename, "wb") as f: 
     d = {"files" : [filename+"_" + str(i) for i in range(0,5)]} 
     json.dump(d, f) 
    return True 

@app.task 
def foo(filename): 
    return "foo_" + filename 

@app.task 
def bar(filename): 
    return "bar_" + filename 


@app.task 
def process_downloaded_file(filename): 
    #process one file in the temp directory and at the end delete the file so it 
    # is not processed several times 
    with open(filename, "rb") as f: 
     d = json.load(f) 
    g = group(chain(download_file.s(f), foo.s(), bar.s()) for f in d["files"]).apply_async() 
    os.remove(filename) 
    return True 

@app.task(bind=True) 
def check_dir(self, tmp_dir, sleep=30): 
    #this task checks the tmp directory. If files have been written it processes 
    #every file in the directory. The task autoretries each *sleep* seconds 
    for f in glob.glob(tmp_dir + "*"): 
     process_downloaded_file.delay(f) 
    self.retry(args=(tmp_dir, sleep), countdown=sleep) 

main.py

from celery import group, chain 
from tasks import * 

path = "./data/tmp_dir/" 
urls = ["file1", "file2"] 
group(chain(download_file.s(f), unpack_file.s(), parse_file.s()) for f in urls).apply_async() 
check_dir.delay(path) 

консольный вывод:

[2017-02-14 18:10:41,630: INFO/MainProcess] Received task: arthur.tasks.download_file[65cb06c6-b8b6-4108-af36-84103037e4a2] 
[2017-02-14 18:10:41,632: INFO/MainProcess] Received task: arthur.tasks.download_file[d069e046-4153-4320-8f9d-a22adeeb2827] 
[2017-02-14 18:10:41,637: INFO/MainProcess] Received task: arthur.tasks.check_dir[19a1984f-c2ed-4de6-82d6-b5ad5a6bacc5] 
[2017-02-14 18:10:41,666: INFO/MainProcess] Received task: arthur.tasks.unpack_file[e9eab102-8ae0-4000-b384-5cfa0e01e805] 
[2017-02-14 18:10:41,674: INFO/MainProcess] Task arthur.tasks.download_file[65cb06c6-b8b6-4108-af36-84103037e4a2] succeeded in 0.0389260330703s: u'file1' 
[2017-02-14 18:10:41,682: INFO/MainProcess] Received task: arthur.tasks.unpack_file[47b13b21-57e2-44be-82dd-f8e0e1adff2e] 
[2017-02-14 18:10:41,689: INFO/MainProcess] Task arthur.tasks.download_file[d069e046-4153-4320-8f9d-a22adeeb2827] succeeded in 0.0534016339807s: u'file2' 
[2017-02-14 18:10:41,691: INFO/MainProcess] Received task: arthur.tasks.parse_file[bd3fa287-9cf0-4802-88ca-2593c27af4f7] 
[2017-02-14 18:10:41,696: INFO/MainProcess] Task arthur.tasks.unpack_file[47b13b21-57e2-44be-82dd-f8e0e1adff2e] succeeded in 0.00816849502735s: u'unzipped_file2' 
[2017-02-14 18:10:41,704: INFO/MainProcess] Received task: arthur.tasks.process_downloaded_file[1b72f409-f5b5-480a-b651-616dc02b2207] 
[2017-02-14 18:10:41,706: INFO/MainProcess] Task arthur.tasks.parse_file[bd3fa287-9cf0-4802-88ca-2593c27af4f7] succeeded in 0.00894999306183s: True 
[2017-02-14 18:10:41,708: INFO/MainProcess] Task arthur.tasks.check_dir[19a1984f-c2ed-4de6-82d6-b5ad5a6bacc5] retry: Retry in 30s 
[2017-02-14 18:10:41,709: INFO/MainProcess] Received task: arthur.tasks.process_downloaded_file[1d8d340f-61f7-4ef3-a90e-913a3bfb5478] 
[2017-02-14 18:10:41,713: INFO/MainProcess] Task arthur.tasks.unpack_file[e9eab102-8ae0-4000-b384-5cfa0e01e805] succeeded in 0.044072615914s: u'unzipped_file1' 
[2017-02-14 18:10:41,714: INFO/MainProcess] Received task: arthur.tasks.check_dir[19a1984f-c2ed-4de6-82d6-b5ad5a6bacc5] eta:[2017-02-14 17:11:11.692241+00:00] 
[2017-02-14 18:10:41,717: INFO/MainProcess] Received task: arthur.tasks.parse_file[e839826a-dfa5-4df0-a716-9c21371c297f] 
[2017-02-14 18:10:41,720: INFO/MainProcess] Received task: arthur.tasks.download_file[743153f9-9c92-430e-84f5-7d99a269c104] 
[2017-02-14 18:10:41,724: INFO/MainProcess] Task arthur.tasks.process_downloaded_file[1b72f409-f5b5-480a-b651-616dc02b2207] succeeded in 0.0153999190079s: True 
[2017-02-14 18:10:41,725: INFO/MainProcess] Task arthur.tasks.parse_file[e839826a-dfa5-4df0-a716-9c21371c297f] succeeded in 0.00395095907152s: True 
[2017-02-14 18:10:41,726: INFO/MainProcess] Task arthur.tasks.download_file[743153f9-9c92-430e-84f5-7d99a269c104] succeeded in 0.00449692492839s: u'unzipped_file1_0' 
[2017-02-14 18:10:41,727: INFO/MainProcess] Received task: arthur.tasks.download_file[a29470d7-85a0-4a91-a410-2e51cff81cea] 
[2017-02-14 18:10:41,728: INFO/MainProcess] Task arthur.tasks.process_downloaded_file[1d8d340f-61f7-4ef3-a90e-913a3bfb5478] succeeded in 0.0129376259865s: True 
[2017-02-14 18:10:41,729: INFO/MainProcess] Received task: arthur.tasks.download_file[44a1cc48-52a4-4548-a862-48d402dd92f1] 
[2017-02-14 18:10:41,731: INFO/MainProcess] Received task: arthur.tasks.download_file[cedf91b2-6e3f-48c3-880e-b80a1c38efed] 
[2017-02-14 18:10:41,733: INFO/MainProcess] Task arthur.tasks.download_file[a29470d7-85a0-4a91-a410-2e51cff81cea] succeeded in 0.003385586082s: u'unzipped_file1_1' 
[2017-02-14 18:10:41,734: INFO/MainProcess] Task arthur.tasks.download_file[44a1cc48-52a4-4548-a862-48d402dd92f1] succeeded in 0.00395720102824s: u'unzipped_file1_2' 
[2017-02-14 18:10:41,735: INFO/MainProcess] Received task: arthur.tasks.download_file[d93a7260-43dc-4e77-b5ff-ce0e3bc426ce] 
[2017-02-14 18:10:41,739: INFO/MainProcess] Task arthur.tasks.download_file[d93a7260-43dc-4e77-b5ff-ce0e3bc426ce] succeeded in 0.00272180500906s: u'unzipped_file1_4' 
[2017-02-14 18:10:41,740: INFO/MainProcess] Task arthur.tasks.download_file[cedf91b2-6e3f-48c3-880e-b80a1c38efed] succeeded in 0.00340146606322s: u'unzipped_file1_3' 
[2017-02-14 18:10:41,740: INFO/MainProcess] Received task: arthur.tasks.download_file[f1b19d02-a97d-4e32-afde-e39d46d45bad] 
[2017-02-14 18:10:41,742: INFO/MainProcess] Received task: arthur.tasks.download_file[4a0bce55-8662-42a6-a19d-3ff33496d7e0] 
[2017-02-14 18:10:41,745: INFO/MainProcess] Received task: arthur.tasks.download_file[a759d6a1-a558-46ba-8ee1-2cb28cbe0655] 
[2017-02-14 18:10:41,747: INFO/MainProcess] Task arthur.tasks.download_file[f1b19d02-a97d-4e32-afde-e39d46d45bad] succeeded in 0.00358341098763s: u'unzipped_file2_0' 
[2017-02-14 18:10:41,748: INFO/MainProcess] Task arthur.tasks.download_file[4a0bce55-8662-42a6-a19d-3ff33496d7e0] succeeded in 0.0044348789379s: u'unzipped_file2_1' 
[2017-02-14 18:10:41,749: INFO/MainProcess] Received task: arthur.tasks.foo[e3250c36-92e9-4f53-afef-fe95b035e0dd] 
[2017-02-14 18:10:41,752: INFO/MainProcess] Received task: arthur.tasks.download_file[3e9db0d1-31c5-4703-8e9d-c2b9f4237d8d] 
[2017-02-14 18:10:41,754: INFO/MainProcess] Task arthur.tasks.download_file[a759d6a1-a558-46ba-8ee1-2cb28cbe0655] succeeded in 0.00349929102231s: u'unzipped_file2_2' 
[2017-02-14 18:10:41,755: INFO/MainProcess] Task arthur.tasks.foo[e3250c36-92e9-4f53-afef-fe95b035e0dd] succeeded in 0.00417044304777s: u'foo_unzipped_file1_0' 
[2017-02-14 18:10:41,755: INFO/MainProcess] Received task: arthur.tasks.download_file[dcda209f-f4be-4697-84c1-e55a8502a45c] 
[2017-02-14 18:10:41,757: INFO/MainProcess] Received task: arthur.tasks.foo[3e9db173-7200-4c46-aade-72be5553b0cf] 
[2017-02-14 18:10:41,760: INFO/MainProcess] Task arthur.tasks.download_file[3e9db0d1-31c5-4703-8e9d-c2b9f4237d8d] succeeded in 0.00325334002264s: u'unzipped_file2_3' 
[2017-02-14 18:10:41,760: INFO/MainProcess] Task arthur.tasks.download_file[dcda209f-f4be-4697-84c1-e55a8502a45c] succeeded in 0.00384710694198s: u'unzipped_file2_4' 
[2017-02-14 18:10:41,761: INFO/MainProcess] Received task: arthur.tasks.foo[d936cddc-027c-4640-8a0b-26a7d9723ccc] 
[2017-02-14 18:10:41,764: INFO/MainProcess] Received task: arthur.tasks.foo[11ae4aef-1af9-43a0-94b8-7b95575cd1bc] 
[2017-02-14 18:10:41,765: INFO/MainProcess] Task arthur.tasks.foo[3e9db173-7200-4c46-aade-72be5553b0cf] succeeded in 0.00316555600148s: u'foo_unzipped_file1_1' 
[2017-02-14 18:10:41,766: INFO/MainProcess] Task arthur.tasks.foo[d936cddc-027c-4640-8a0b-26a7d9723ccc] succeeded in 0.00383736204822s: u'foo_unzipped_file1_2' 
[2017-02-14 18:10:41,767: INFO/MainProcess] Received task: arthur.tasks.foo[9e60adad-57e2-4a6e-874d-c687df189714] 
[2017-02-14 18:10:41,769: INFO/MainProcess] Received task: arthur.tasks.foo[8b1eebb8-abb0-4223-872c-e9687031380c] 
[2017-02-14 18:10:41,771: INFO/MainProcess] Task arthur.tasks.foo[11ae4aef-1af9-43a0-94b8-7b95575cd1bc] succeeded in 0.00347809505183s: u'foo_unzipped_file1_3' 
[2017-02-14 18:10:41,772: INFO/MainProcess] Task arthur.tasks.foo[9e60adad-57e2-4a6e-874d-c687df189714] succeeded in 0.00403305899817s: u'foo_unzipped_file1_4' 
[2017-02-14 18:10:41,773: INFO/MainProcess] Received task: arthur.tasks.foo[f9c137d7-4087-4519-919d-62bba457747f] 
[2017-02-14 18:10:41,775: INFO/MainProcess] Received task: arthur.tasks.foo[2a43d460-aceb-465e-8be5-678cb930a60e] 
[2017-02-14 18:10:41,777: INFO/MainProcess] Task arthur.tasks.foo[8b1eebb8-abb0-4223-872c-e9687031380c] succeeded in 0.00311726506334s: u'foo_unzipped_file2_0' 
[2017-02-14 18:10:41,778: INFO/MainProcess] Task arthur.tasks.foo[f9c137d7-4087-4519-919d-62bba457747f] succeeded in 0.00378636294045s: u'foo_unzipped_file2_1' 
[2017-02-14 18:10:41,778: INFO/MainProcess] Received task: arthur.tasks.bar[770d4cd4-527c-4efe-975f-daf337934c78] 
[2017-02-14 18:10:41,780: INFO/MainProcess] Received task: arthur.tasks.foo[c09677f9-183e-43ef-889c-c8b7cab2bd23] 
[2017-02-14 18:10:41,783: INFO/MainProcess] Task arthur.tasks.foo[2a43d460-aceb-465e-8be5-678cb930a60e] succeeded in 0.00324743904639s: u'foo_unzipped_file2_2' 
[2017-02-14 18:10:41,783: INFO/MainProcess] Task arthur.tasks.bar[770d4cd4-527c-4efe-975f-daf337934c78] succeeded in 0.00382692192215s: u'bar_foo_unzipped_file1_0' 
[2017-02-14 18:10:41,784: INFO/MainProcess] Received task: arthur.tasks.foo[1a6294da-8cae-4bf1-9d56-be5972254e07] 
[2017-02-14 18:10:41,787: INFO/MainProcess] Received task: arthur.tasks.bar[bd15721f-3bea-4c64-a0c4-59c5c8730171] 
[2017-02-14 18:10:41,788: INFO/MainProcess] Task arthur.tasks.foo[c09677f9-183e-43ef-889c-c8b7cab2bd23] succeeded in 0.00343648903072s: u'foo_unzipped_file2_4' 
[2017-02-14 18:10:41,789: INFO/MainProcess] Task arthur.tasks.foo[1a6294da-8cae-4bf1-9d56-be5972254e07] succeeded in 0.00413183600176s: u'foo_unzipped_file2_3' 
[2017-02-14 18:10:41,790: INFO/MainProcess] Received task: arthur.tasks.bar[29a982bd-2a72-49e7-bc56-0f2a4b2ba947] 
[2017-02-14 18:10:41,792: INFO/MainProcess] Received task: arthur.tasks.bar[5944c49d-428d-4237-8777-edec76b36512] 
[2017-02-14 18:10:41,794: INFO/MainProcess] Task arthur.tasks.bar[bd15721f-3bea-4c64-a0c4-59c5c8730171] succeeded in 0.0031840458978s: u'bar_foo_unzipped_file1_2' 
[2017-02-14 18:10:41,795: INFO/MainProcess] Task arthur.tasks.bar[29a982bd-2a72-49e7-bc56-0f2a4b2ba947] succeeded in 0.00374374503735s: u'bar_foo_unzipped_file1_1' 
[2017-02-14 18:10:41,796: INFO/MainProcess] Received task: arthur.tasks.bar[12272aad-f6e6-432e-945a-363a678ba2a8] 
[2017-02-14 18:10:41,798: INFO/MainProcess] Task arthur.tasks.bar[5944c49d-428d-4237-8777-edec76b36512] succeeded in 0.00241802399978s: u'bar_foo_unzipped_file1_4' 
[2017-02-14 18:10:41,798: INFO/MainProcess] Received task: arthur.tasks.bar[493cc5cc-797b-40f3-87a7-1394af1ae45d] 
[2017-02-14 18:10:41,801: INFO/MainProcess] Received task: arthur.tasks.bar[e2925c2b-426d-4076-8a8c-c67c56a2ab8e] 
[2017-02-14 18:10:41,803: INFO/MainProcess] Task arthur.tasks.bar[12272aad-f6e6-432e-945a-363a678ba2a8] succeeded in 0.00308170204517s: u'bar_foo_unzipped_file1_3' 
[2017-02-14 18:10:41,804: INFO/MainProcess] Task arthur.tasks.bar[493cc5cc-797b-40f3-87a7-1394af1ae45d] succeeded in 0.00375492009334s: u'bar_foo_unzipped_file2_0' 
[2017-02-14 18:10:41,804: INFO/MainProcess] Received task: arthur.tasks.bar[c4b4e9de-4ce7-476f-b275-278db3d8099f] 
[2017-02-14 18:10:41,807: INFO/MainProcess] Received task: arthur.tasks.bar[b0cdb87c-292f-4f14-975c-c7bd4679373d] 
[2017-02-14 18:10:41,808: INFO/MainProcess] Task arthur.tasks.bar[c4b4e9de-4ce7-476f-b275-278db3d8099f] succeeded in 0.00304232595954s: u'bar_foo_unzipped_file2_2' 
[2017-02-14 18:10:41,809: INFO/MainProcess] Task arthur.tasks.bar[e2925c2b-426d-4076-8a8c-c67c56a2ab8e] succeeded in 0.00377448496874s: u'bar_foo_unzipped_file2_1' 
[2017-02-14 18:10:41,810: INFO/MainProcess] Received task: arthur.tasks.bar[3570e196-7c41-43b1-b7ef-68b2d31f28a2] 
[2017-02-14 18:10:41,813: INFO/MainProcess] Task arthur.tasks.bar[b0cdb87c-292f-4f14-975c-c7bd4679373d] succeeded in 0.00181642104872s: u'bar_foo_unzipped_file2_4' 
[2017-02-14 18:10:41,813: INFO/MainProcess] Task arthur.tasks.bar[3570e196-7c41-43b1-b7ef-68b2d31f28a2] succeeded in 0.00239081599284s: u'bar_foo_unzipped_file2_3' 
+0

Спасибо за подробный ответ, я соглашусь с ним, потому что я на самом деле закончил схожую вещь. Тем не менее, одна проблема осталась, заключается в том, что сельдерей, видимо, не ждет, когда «цепочки» в «группе» закончат конец, вернет свои результаты (группа вернется, как только цепочки будут выполнять свои задачи, запланированные для запуска), поэтому я думаю нет никакого приятного и простого способа получить результат в этом сценарии, не записывая файлы или базу данных. – madprogrammer

+0

Действительно, сельдерей не дожидается окончания группы. но если вам нужно ждать результатов, то сельдерей может быть не тем, что вам нужно. Я не вижу ни одного хорошего рабочего процесса сельдерея для решения вашего сценария, потому что вам нужно дождаться первого шага до начала второго шага. – arthur

+0

, но приятной особенностью группы является то, что вы ее сохраняете и восстанавливаете. например, в задаче 'process_downloaded_file' вы можете сделать:' g.save() 'и' print (g.id) '. Затем в другой части кода, который не находится в задаче, вы можете сделать 'g = app.GroupResult.restore (the_group_id)' и 'g.get()' – arthur

 Смежные вопросы

  • Нет связанных вопросов^_^