2017-02-05 16 views
4

Я бы хотел, чтобы concurrent.futures.ProcessPoolExecutor.map() вызывал функцию, состоящую из 2 или более аргументов. В приведенном ниже примере я прибегал к использованию функции lambda и определял ref как массив равного размера numberlist с одинаковым значением.Как передать функцию с несколькими аргументами на python concurrent.futures.ProcessPoolExecutor.map()?

1-й вопрос: Есть ли лучший способ сделать это? В случае, когда размер списка номеров может составлять от миллиона до миллиарда элементов, следовательно, размер ссылки должен соответствовать списку номеров, этот подход излишне берет драгоценную память, чего я бы хотел избежать. Я сделал это, потому что я прочитал функцию map, чтобы завершить ее отображение до тех пор, пока не будет достигнут самый короткий конец массива.

import concurrent.futures as cf 

nmax = 10 
numberlist = range(nmax) 
ref = [5, 5, 5, 5, 5, 5, 5, 5, 5, 5] 
workers = 3 


def _findmatch(listnumber, ref):  
    print('def _findmatch(listnumber, ref):') 
    x='' 
    listnumber=str(listnumber) 
    ref = str(ref) 
    print('listnumber = {0} and ref = {1}'.format(listnumber, ref)) 
    if ref in listnumber: 
     x = listnumber 
    print('x = {0}'.format(x)) 
    return x 

a = map(lambda x, y: _findmatch(x, y), numberlist, ref) 
for n in a: 
    print(n) 
    if str(ref[0]) in n: 
     print('match') 

with cf.ProcessPoolExecutor(max_workers=workers) as executor: 
    #for n in executor.map(_findmatch, numberlist): 
    for n in executor.map(lambda x, y: _findmatch(x, ref), numberlist, ref): 
     print(type(n)) 
     print(n) 
     if str(ref[0]) in n: 
      print('match') 

Запуск выше код, я обнаружил, что функция map удалось добиться моего желаемого результата. Однако, когда я передал те же условия для concurrent.futures.ProcessPoolExecutor.map(), python3.5 неудачу с этой ошибкой:

Traceback (most recent call last): 
    File "/usr/lib/python3.5/multiprocessing/queues.py", line 241, in _feed 
    obj = ForkingPickler.dumps(obj) 
    File "/usr/lib/python3.5/multiprocessing/reduction.py", line 50, in dumps 
    cls(buf, protocol).dump(obj) 
_pickle.PicklingError: Can't pickle <function <lambda> at 0x7fd2a14db0d0>: attribute lookup <lambda> on __main__ failed 

Вопрос 2: Почему происходят эта ошибка и как я могу получить одновременно .futures.ProcessPoolExecutor.map() для вызова функции с более чем 1 аргументом?

ответ

3

Чтобы ответить на ваш второй вопрос, вы получаете исключение, потому что lambda функция как тот, который вы используете не пригодны для консервирования. Поскольку Python использует протокол pickle для сериализации данных, переданных между основным процессом и рабочими процессами ProcessPoolExecutor, это проблема. Непонятно, почему вы используете lambda. У лямбда у вас есть два аргумента, как и исходная функция. Вы можете использовать _findmatch напрямую, а не lambda, и он должен работать.

with cf.ProcessPoolExecutor(max_workers=workers) as executor: 
    for n in executor.map(_findmatch, numberlist, ref): 
     ... 

Что касается первого вопроса о передаче второго постоянного аргумента без создания гигантского списка, вы можете решить это несколькими способами.Один из подходов может состоять в том, чтобы использовать itertools.repeat для создания итеративного объекта, который повторяет то же значение навсегда при повторении.

Но лучший подход, вероятно, заключался бы в том, чтобы написать дополнительную функцию, которая передает вам постоянный аргумент. (Возможно, именно поэтому вы пытаетесь использовать lambda функцию?) Он должен работать, если функция используется доступна в пространстве имен верхнего уровня модуля:

def _helper(x): 
    return _findmatch(x, 5) 

with cf.ProcessPoolExecutor(max_workers=workers) as executor: 
    for n in executor.map(_helper, numberlist): 
     ... 
+0

Вы правы, я прибегал к экспериментированию с lambda, потому что изначально у меня возникла проблема с передачей функции с двумя аргументами в 'executor', когда' ref' был константой. После преобразования 'ref' в список, равный размеру' numberlist', я просто понял, что забыл удалить лямбда. То, что я действительно хотел, было решением, где 'ref' является постоянным или похожим. Таким образом, вспомогательная функция & itertools.repeat', которую вы упомянули, работает. Спасибо. –

+0

Я хотел бы предложить вам ответить на мой [последующий вопрос] (http://stackoverflow.com/q/42074501/5722359), где я сравнивал производительность «Executor.map» с «Executor.submit» и нашел бывший значительно медленнее, и мне нравится знать, почему? –

1

Что касается вашего первого вопроса, правильно ли я понимаю, что вы хотите передать аргумент, значение которого определяется только в момент, когда вы вызываете map, но постоянным для всех экземпляров отображаемой функции? Если да, то я хотел бы сделать map функцией, полученной из «шаблона функции» со вторым аргументом (ref в вашем примере) запеченной в него с помощью functools.partial:

from functools import partial 
refval = 5 

def _findmatch(ref, listnumber): # arguments swapped 
    ... 

with cf.ProcessPoolExecutor(max_workers=workers) as executor: 
    for n in executor.map(partial(_findmatch, refval), numberlist): 
     ... 

Re. вопрос 2, первая часть: я не нашел точный фрагмент кода, который пытается рассортировать (сериализовать) функцию, которая затем должна выполняться параллельно, но кажется естественным, что это должно произойти - не только аргументы, но и функция должна быть передана работникам, и это, вероятно, должно быть сериализовано для этой передачи. Тот факт, что функции partial можно мариновать, а lambda s не может упоминаться в другом месте, например здесь: https://stackoverflow.com/a/19279016/6356764.

Re. вопрос 2, вторая часть: если вы хотите вызвать функцию с несколькими аргументами в ProcessPoolExecutor.map, вы передадите ей функцию как первый аргумент, за которой последует итерабельность первых аргументов для функции, за которой следует итерабельность ее второй аргументы и т.д. в вашем случае:

for n in executor.map(_findmatch, numberlist, ref): 
    ... 
+0

Спасибо за обмен. :) Ваши решения сработали. Это мое первое время, узнав о частичном тоже. –

+0

Я хотел бы предложить вам ответить на мой [последующий вопрос] (http://stackoverflow.com/q/42074501/5722359), где я сравнивал производительность «Executor.map» с «Executor.submit» и нашел бывший значительно медленнее, и мне нравится знать, почему? –

2

(1) Нет необходимости, чтобы сделать список. Вы можете использовать itertools.repeat для создания итератора, который просто повторяет некоторое значение.

(2) Вам необходимо передать именованную функцию в map, потому что она будет передана подпроцессу для выполнения. map использует протокол рассола для отправки вещей, лямбды не могут быть маринованными, и поэтому они не могут быть частью карты. Но это совершенно не нужно. Все ваши лямбды сделали вызов функции параметров 2 с двумя параметрами. Удалите его полностью.

Рабочий код

import concurrent.futures as cf 
import itertools 

nmax = 10 
numberlist = range(nmax) 
workers = 3 

def _findmatch(listnumber, ref):  
    print('def _findmatch(listnumber, ref):') 
    x='' 
    listnumber=str(listnumber) 
    ref = str(ref) 
    print('listnumber = {0} and ref = {1}'.format(listnumber, ref)) 
    if ref in listnumber: 
     x = listnumber 
    print('x = {0}'.format(x)) 
    return x 

with cf.ProcessPoolExecutor(max_workers=workers) as executor: 
    #for n in executor.map(_findmatch, numberlist): 
    for n in executor.map(_findmatch, numberlist, itertools.repeat(5)): 
     print(type(n)) 
     print(n) 
     #if str(ref[0]) in n: 
     # print('match') 
+0

Спасибо за объяснение и решение. :) –

+0

Я хотел бы предложить вам ответить на мой [следующий вопрос] (http://stackoverflow.com/q/42074501/5722359), где я сравнивал производительность «Executor.map» с «Executor.submit» и нашел первый значительно медленнее, и мне нравится знать, почему? –

 Смежные вопросы

  • Нет связанных вопросов^_^