Я заинтересован в создании пула работников, the_pool
, используя multiprocessing.Pool
, который использует связь Queue
. Однако у каждого работника есть аргумент role
, который является уникальным для этого работника и должен быть предоставлен при инициализации работника. Это ограничение навязывается API, с которым я взаимодействую, и поэтому его нельзя обойти. Если бы я не нуждался в очереди, я мог бы просто перебрать список role
значений и вызывать apply_async
, например, так:Как передать переменные инициализации для конкретного работника в рабочий пул, если аргументы содержат очередь?
[the_pool.apply_async(worker_main, role) for role in roles]
К сожалению, Queue
объект может быть передан только в бассейнах во время бассейна конкретизации, как:
the_pool = multiprocessing.Pool(3, worker_main, (the_queue,))
Попытка передать Queue
через аргументы apply_async
вызывает ошибку во время выполнения. В следующем примере, адаптированном из this question, мы пытаемся создать пул из трех рабочих. Но пример не работает, потому что нет способа получить элемент ролей от roles
до initargs
для пула.
import os
import time
import multiprocessing
# A dummy function representing some fixed functionality.
def do_something(x):
print('I got a thing:', x)
# A main function, run by our workers. (Remove role arg for working example)
def worker_main(queue, role):
print('The worker at', os.getpid(), 'has role', role, ' and is initialized.')
# Use role in some way. (Comment out for working example)
do_something(role)
while True:
# Block until something is in the queue.
item = queue.get(True)
print(item)
time.sleep(0.5)
if __name__ == '__main__':
# Define some roles for our workers.
roles = [1, 2, 3]
# Instantiate a Queue for communication.
the_queue = multiprocessing.Queue()
# Build a Pool of workers, each running worker_main.
# PROBLEM: Next line breaks - how do I pass one element of roles to each worker?
the_pool = multiprocessing.Pool(3, worker_main, (the_queue,))
# Iterate, sending data via the Queue.
[the_queue.put('Insert useful message here') for _ in range(5)]
worker_pool.close()
worker_pool.join()
time.sleep(10)
Один тривиальных Обходной включать в initargs
, который служит только для связи роли каждого работника второго Queue
и блокировать выполнение рабочих до тех пор, пока не получит свою роль с помощью этой очереди. Это, однако, вводит дополнительную очередь, которая не должна быть необходимой. Соответствующая документация - here. Руководство и советы очень приветствуются.
После рассмотрения вопроса и ответа Тима Петерс, я чувствую, что есть конфликт между началом рабочего на бассейн 3 рабочих, и желая делегировать роли к когда они начинаются. Похоже, вы либо хотите пул на роль, либо хотите, чтобы роль была передана как часть элемента очереди, чтобы каждый работник мог выполнять какую-либо роль. –
Justin - В случае, если вы не знаете, обратите внимание, что получение ответа на вопрос Python от Тима Петерса находится на одном уровне, чтобы получить его от Гвидо. Размышляйте над этим. –
@ DanFarrell, вы правы - я хочу, чтобы рабочие процессы были отделены от любого конкретного выбора 'role'. Использование одного пула на роль не будет делать трюк, потому что мне нужно, чтобы пул был разделен по ролям по архитектурным причинам. В настоящее время я передаю значение роли через отдельный объект 'Queue', как вы предложили, в качестве рабочего процесса. Я блокирую выполнение метода init init, пока не будет получено значение «role», а затем сразу же попадет в 'main'.Я надеюсь, что кто-то разработал питоническую идиому, чтобы избежать этой избыточной очереди/блокировки. –