2017-01-02 5 views
1

Я заинтересован в создании пула работников, the_pool, используя multiprocessing.Pool, который использует связь Queue. Однако у каждого работника есть аргумент role, который является уникальным для этого работника и должен быть предоставлен при инициализации работника. Это ограничение навязывается API, с которым я взаимодействую, и поэтому его нельзя обойти. Если бы я не нуждался в очереди, я мог бы просто перебрать список role значений и вызывать apply_async, например, так:Как передать переменные инициализации для конкретного работника в рабочий пул, если аргументы содержат очередь?

[the_pool.apply_async(worker_main, role) for role in roles] 

К сожалению, Queue объект может быть передан только в бассейнах во время бассейна конкретизации, как:

the_pool = multiprocessing.Pool(3, worker_main, (the_queue,)) 

Попытка передать Queue через аргументы apply_async вызывает ошибку во время выполнения. В следующем примере, адаптированном из this question, мы пытаемся создать пул из трех рабочих. Но пример не работает, потому что нет способа получить элемент ролей от roles до initargs для пула.

import os 
import time 
import multiprocessing 

# A dummy function representing some fixed functionality. 
def do_something(x): 
    print('I got a thing:', x) 

# A main function, run by our workers. (Remove role arg for working example) 
def worker_main(queue, role): 

    print('The worker at', os.getpid(), 'has role', role, ' and is initialized.') 

    # Use role in some way. (Comment out for working example) 
    do_something(role) 

    while True: 
     # Block until something is in the queue. 
     item = queue.get(True) 
     print(item) 
     time.sleep(0.5) 

if __name__ == '__main__': 

    # Define some roles for our workers. 
    roles = [1, 2, 3] 

    # Instantiate a Queue for communication. 
    the_queue = multiprocessing.Queue() 

    # Build a Pool of workers, each running worker_main. 
    # PROBLEM: Next line breaks - how do I pass one element of roles to each worker? 
    the_pool = multiprocessing.Pool(3, worker_main, (the_queue,)) 

    # Iterate, sending data via the Queue. 
    [the_queue.put('Insert useful message here') for _ in range(5)] 

    worker_pool.close() 
    worker_pool.join() 
    time.sleep(10) 

Один тривиальных Обходной включать в initargs, который служит только для связи роли каждого работника второго Queue и блокировать выполнение рабочих до тех пор, пока не получит свою роль с помощью этой очереди. Это, однако, вводит дополнительную очередь, которая не должна быть необходимой. Соответствующая документация - here. Руководство и советы очень приветствуются.

+1

После рассмотрения вопроса и ответа Тима Петерс, я чувствую, что есть конфликт между началом рабочего на бассейн 3 рабочих, и желая делегировать роли к когда они начинаются. Похоже, вы либо хотите пул на роль, либо хотите, чтобы роль была передана как часть элемента очереди, чтобы каждый работник мог выполнять какую-либо роль. –

+0

Justin - В случае, если вы не знаете, обратите внимание, что получение ответа на вопрос Python от Тима Петерса находится на одном уровне, чтобы получить его от Гвидо. Размышляйте над этим. –

+0

@ DanFarrell, вы правы - я хочу, чтобы рабочие процессы были отделены от любого конкретного выбора 'role'. Использование одного пула на роль не будет делать трюк, потому что мне нужно, чтобы пул был разделен по ролям по архитектурным причинам. В настоящее время я передаю значение роли через отдельный объект 'Queue', как вы предложили, в качестве рабочего процесса. Я блокирую выполнение метода init init, пока не будет получено значение «role», а затем сразу же попадет в 'main'.Я надеюсь, что кто-то разработал питоническую идиому, чтобы избежать этой избыточной очереди/блокировки. –

ответ

1

Почему бы не использовать две рабочие функции, одну для инициализации? Как:

def worker_init(q): 
    global queue 
    queue = q 

def worker_main(role): 
    # use the global `queue` freely here 

Инициализация является такой же, как и то, что вы показали, за исключением вызова worker_init:

the_pool = multiprocessing.Pool(3, worker_init, (the_queue,)) 

инициализация выполняется только один раз в рабочий процесс, и каждый процесс повторяется, пока Pool завершается. Для того, чтобы получить работу, делать то, что вы хотите сделать:

[the_pool.apply_async(worker_main, role) for role in roles] 

Там нет необходимости передавать the_queue тоже - каждый рабочий процесс уже узнал об этом во время инициализации.

+0

В этом случае вы получаете только одного работника за роль, правильно? В вопросе есть равное количество ролей в качестве рабочих. Но если бы сказали, что желали 10 желающих и 3 роли, это запустило бы только бегун очереди (предположительно внутри «worker_main» один раз для каждой из трех ролей, правильно? –

+0

Если 'role' остается' [1, 2, 3 ] ', да - код делает именно то, что вы говорите, чтобы сделать ;-) Отсутствие телепатии, я не могу догадаться, что вы должны сделать 10 человек. Если, например, вы хотите, чтобы роли 1 и 2 принимались по 3 работника каждый, а роль 3 - 4 работника, используйте некоторую перестановку «[1, 1, 1, 2, 2, 2, 3, 3, 3, 3] 'для' ролей'. –

+0

Да, на роль будет только один рабочий, и каждая роль будет уникальным целым на практике. Оглядываясь назад, я должен был сделать свой вопрос более общим. Я не считал, что я налагаю ограничения, связанные с реализацией. В общем, я искал идиому, которая позволит мне порождать «n» работников, используя список аргументов, 'my_list', где' len (my_list) 'дает' n'. В моей реальной реализации я инкапсулирую функциональность рабочего объекта в объекты, поэтому выгодно избегать вызова двух разных функций, если это возможно. –

0

Вы можете просто создать очередь с ролями:

import os 
import time 
import multiprocessing 

# A dummy function representing some fixed functionality. 
def do_something(x): 
    print('I got a thing:', x) 

# A main function, run by our workers. (Remove role arg for working example) 
def worker_main(queue, roles): 
    role = roles.get() 
    print('The worker at', os.getpid(), 'has role', role, ' and is initialized.') 

    # Use role in some way. (Comment out for working example) 
    do_something(role) 

    while True: 
     # Block until something is in the queue. 
     item = queue.get(True) 
     print(item) 
     time.sleep(0.5) 

if __name__ == '__main__': 

    # Define some roles for our workers. 
    roles = [1, 2, 3] 

    # Instantiate a Queue for communication. 
    the_queue = multiprocessing.Queue() 
    roles_queue = multiprocessing.Queue() 
    for role in roles: 
     roles_queue.put(role) 


    # Build a Pool of workers, each running worker_main. 
    # PROBLEM: Next line breaks - how do I pass one element of roles to each worker? 
    the_pool = multiprocessing.Pool(3, worker_main, (the_queue, roles_queue)) 

    # Iterate, sending data via the Queue. 
    [the_queue.put('Insert useful message here') for _ in range(5)] 

    worker_pool.close() 
    worker_pool.join() 
    time.sleep(10) 
+0

Да, я сделал это как обход, как упоминалось в вопросе. В частности, я ищу идиому python, которая позволяет избежать использования дополнительной очереди, которая, как оказалось, может оказаться невозможной. См. Комментарии к ответу Тима. –