2016-04-07 4 views
1

У меня есть задачи в цепочке три задачи fetch_page, check_source и странице магазинасельдерея вызова различных функций и продолжить процесс цепочки

def update_page_info(**headers): 
    chain=fetch_page.s(headers['key']) | check_source.s(headers['key_1']) | store_info.s() 
    chain().apply_async() 

fetch_page извлекает страницу и собирает то, что ему нужно собрать:

@app.task(bind=True) 
def fetch_page(self,url): 
    #fetch_page here and return a tuple so that it can be unpacked 
    # dosomething 

После извлечения страницы теперь он проверяет источник в следующих задачах check_source.

@app.task(bind=True) 
def check_source(self,page_and_url,handle): 
    try: 
     #unpack your stuffs here 
     page,url=page_and_url 
     get_result={} 

     if handle=='first_option': 
      get_result=select_first_option(one,two) 
      return get_result 

     elif handle=='second_option': 
      get_result=select_second_option(one,two) 
      return (get_result) 

     elif handle=='third_option': 
      get_result=select_third_option(one,two) 
      return (get_result) 
     else: 
      return "IGNORE FOR NOW" 
    except Exception as exc: 
     pass 

так что путаница - это я могу назвать некоторые другие задачи здесь? будет ли какая-либо несогласованность или будет ли рабочий когда-либо заходить в тупик?

И, наконец, он должен выполнить store_info(), который будет просто хранить вещи вернулись из check_source()

@app.task(bind=True) 
def store_info(self,result): 
    print ("store_info ") 
    try: 
     #store the fetched pages 

    except Exception as exc: 
     #dosomething 
    finally: 
     pass 

Я следовал такой подход, который просто нужно немного модифицировать http://docs.celeryproject.org/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks.

Может кто-нибудь предложить мне, как это должно быть сделано, и вещи, о которых мне нужно быть более осторожными?

ответ

1

Это должно работать так, как будто вы читаете (и общаетесь). Три задачи будут выполняться без всякой «непоследовательности».

Если вы позвоните по телефону update_page_info один раз, то три подзадачи с тремя цепями будут выполняться исключительно друг от друга. Тем не менее, по-прежнему существует потенциал для взаимоблокировок с этой установкой. Если вы вызвали update_page_info во время предыдущих заданий с того момента, как вы его вызвали, вы можете запустить сразу несколько задач. Это может привести к возникновению взаимоблокировок в зависимости от того, как ваши задачи обмениваются ресурсами.

Если ваши задачи обмениваются ресурсами, я бы предложил использовать что-то вроде redis или memcached как систему блокировки между рабочими.

Редактировать: код, который я вижу сейчас, полностью прекрасен, поскольку результаты передаются как параметры для следующей задачи.

+0

Я тестировал это и его рабочий файл до сих пор за несколько задач, просто волнуясь, если рабочие перегружены, я думаю, мне нужно проверить его еще на несколько задач. Благодарю. – ashim888

+1

Тестирование - хорошее место для начала! Позволяет ли ваш сценарий нескольким работникам одновременно выполнять одну и ту же задачу? Если это проблема, вам необходимо убедиться, что сразу можно выполнить только одну задачу. Вы можете сделать это с помощью мьютекса/семафора, с которым работает ваш сельдерей. Что именно вы пытаетесь сделать? – deeb

+0

На данный момент я использую только одного работника для выполнения той же задачи. Я планирую использовать 2/3 рабочих и не знать о мьютексе/семафоре. Я просто пытаюсь выполнить сканирование и сохранить результаты. – ashim888