2014-10-03 1 views
30

Я использовал rosetta.parallel.pandas_easy распараллелить применять после того, как группы, например:Распараллеливать применяются после панд GroupBy

from rosetta.parallel.pandas_easy import groupby_to_series_to_frame 
df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2']) 
groupby_to_series_to_frame(df, np.mean, n_jobs=8, use_apply=True, by=df.index) 

Однако, кто-нибудь понял, как распараллелить функцию, которая возвращает dataframe? Этот код не работает для rosetta, как и ожидалось.

def tmpFunc(df): 
    df['c'] = df.a + df.b 
    return df 

df.groupby(df.index).apply(tmpFunc) 
groupby_to_series_to_frame(df, tmpFunc, n_jobs=1, use_apply=True, by=df.index) 

ответ

55

Это похоже на работу, хотя это действительно должно быть встроено в панде

import pandas as pd 
from joblib import Parallel, delayed 
import multiprocessing 

def tmpFunc(df): 
    df['c'] = df.a + df.b 
    return df 

def applyParallel(dfGrouped, func): 
    retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped) 
    return pd.concat(retLst) 

if __name__ == '__main__': 
    df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2']) 
    print 'parallel version: ' 
    print applyParallel(df.groupby(df.index), tmpFunc) 

    print 'regular version: ' 
    print df.groupby(df.index).apply(tmpFunc) 

    print 'ideal version (does not work): ' 
    print df.groupby(df.index).applyParallel(tmpFunc) 
+0

Знаете ли вы, был ли прогресс в включении распараллеливания в панды? – NumenorForLife

+1

Делая небольшие изменения в функции может быть сделано, чтобы вернуть иерархический индекс, что регулярные применить возвращает: определение функции temp_func (FUNC, имя, группа): возвращение FUNC (группа), имя Защиту applyParallel (dfGrouped , func): retLst, top_index = zip (* Параллель (n_jobs = multiprocessing.cpu_count()) (delayed (temp_func) (func, name, group) для имени, группы в dfGrouped)) return pd.concat (retLst, keys = top_index) ' Dang, я не могу понять, как отправлять комментарии в комментариях ... – BoZenKhaa

+2

@ jsc123: есть [dask] (https://github.com/blaze/dask) – paulochf

10

У меня есть хак, который я использую для получения распараллеливания в Пандах. Я разбиваю свою фреймворк на куски, помещаю каждый фрагмент в элемент списка, а затем использую параллельные биты ipython для параллельного применения в списке данных. Затем я вернул список вместе, используя функцию pandas concat.

В общем случае это не применимо. Это работает для меня, потому что функция, которую я хочу применить к каждому фрагменту кадра данных, занимает около минуты. И вытягивание и сбор моих данных не так долго. Таким образом, это, безусловно, куд. С учетом сказанного, вот пример. Я использую IPython ноутбук, так что вы будете видеть %%time магию в моем коде:

## make some example data 
import pandas as pd 

np.random.seed(1) 
n=10000 
df = pd.DataFrame({'mygroup' : np.random.randint(1000, size=n), 
        'data' : np.random.rand(n)}) 
grouped = df.groupby('mygroup') 

В этом примере я собираюсь сделать «ломти» на основе выше GroupBy, но это не должно быть, как данные чередуются. Хотя это довольно распространенная картина.

dflist = [] 
for name, group in grouped: 
    dflist.append(group) 

установить параллельные биты

from IPython.parallel import Client 
rc = Client() 
lview = rc.load_balanced_view() 
lview.block = True 

написать глупую функцию, чтобы обратиться к нашим данным

def myFunc(inDf): 
    inDf['newCol'] = inDf.data ** 10 
    return inDf 

теперь давайте запускать код в последовательный, то параллельно. серийный первый:

%%time 
serial_list = map(myFunc, dflist) 
CPU times: user 14 s, sys: 19.9 ms, total: 14 s 
Wall time: 14 s 

в настоящее время параллельно

%%time 
parallel_list = lview.map(myFunc, dflist) 

CPU times: user 1.46 s, sys: 86.9 ms, total: 1.54 s 
Wall time: 1.56 s 

, то это займет всего несколько миллисекунд, чтобы объединить их обратно в одну dataframe

%%time 
combinedDf = pd.concat(parallel_list) 
CPU times: user 296 ms, sys: 5.27 ms, total: 301 ms 
Wall time: 300 ms 

Я бегу 6 двигателей IPython на мой MacBook, но вы можете увидеть, что он сокращает время выполнения до 2 с от 14 секунд.

Для действительно долгого стохастического моделирования я могу использовать AWS-бэкэнд, создав кластер с StarCluster. Однако большую часть времени я распараллеливаю только 8 процессоров на моем MBP.

+0

Я попробую это с моим кодом, спасибо. Можете ли вы объяснить мне, почему применение не автоматически распараллеливает операции? Похоже, что вся польза от функции приложения заключается в том, чтобы избежать цикла, но если он не делает этого с этими группами, что дает? – robertevansanders

+1

Есть длинная история о том, что распараллеливание сложно в Python из-за GIL. Имейте в виду, что применение обычно представляет собой синтаксический сахар, а под ним - подразумеваемый цикл. Использование распараллеливания является несколько сложным, потому что затраты времени на выполнение параллелизуются, что иногда отрицает преимущества распараллеливания. –

+0

Есть ли отсутствующее определение для 'parallel_list', поскольку оно дает ошибку' name 'parallel_list' не определено' в этой строке: 'объединенныйDf = pd.concat (parallel_list)'? – Primer

29

ответ Иван велик, но, похоже, это может быть немного упрощено, а также избавляет от необходимости зависеть от joblib:

from multiprocessing import Pool, cpu_count 

def applyParallel(dfGrouped, func): 
    with Pool(cpu_count()) as p: 
     ret_list = p.map(func, [group for name, group in dfGrouped]) 
    return pandas.concat(ret_list) 

Кстати: это не может заменить любой groupby.apply(), но она будет охватывать типичные случаи: например,он должен охватывать случаи 2 и 3 in the documentation, тогда как вы должны получить поведение случая 1, передав аргумент axis=1 в окончательный вызов pandas.concat().

+0

Когда я запустил его с помощью REPL, я получаю сообщение об ошибке '_pickle.PicklingError: не удается pickle : поиск атрибута tmpFunc на __main__ failed', но как я могу это сделать с REPL? – Keiku

+0

@ Keiku не знаю, я никогда не слышал о REPL раньше ... но вы пытались с помощью '' func = lambda x: x? Если это тоже не сработает, я предлагаю вам открыть конкретный вопрос. способный воспроизводить только с помощью '' applyParallel ([('one', 1), ('two', 2)], your_func) '' –

+0

Спасибо за предложение. Кажется, я попытался перезапустить консоль и разрешить ее. бедненький. – Keiku

0

Короткий комментарий, чтобы сопровождать ответ JD Long. Я обнаружил, что если количество групп очень велико (скажем, сотни тысяч), и ваша прикладная функция делает что-то довольно простое и быстрое, а затем разбивает ваш блок данных на куски и назначает каждому куску работнику для выполнения groupby-apply (в последовательном порядке) может быть намного быстрее, чем выполнять параллельную групповую подачу заявки, а рабочие считывают очередь, содержащую множество групп. Пример:

import pandas as pd 
import numpy as np 
import time 
from concurrent.futures import ProcessPoolExecutor, as_completed 

nrows = 15000 
np.random.seed(1980) 
df = pd.DataFrame({'a': np.random.permutation(np.arange(nrows))}) 

Таким образом, наш dataframe выглядит следующим образом:

a 
0 3425 
1 1016 
2 8141 
3 9263 
4 8018 

Обратите внимание, что колонки 'а' имеет много групп (думают, идентификаторы клиента):

len(df.a.unique()) 
15000 

Функции для работы на нашем группы:

def f1(group): 
    time.sleep(0.0001) 
    return group 

Начало действия оол:

ppe = ProcessPoolExecutor(12) 
futures = [] 
results = [] 

ли параллельный GroupBy применить:

%%time 

for name, group in df.groupby('a'): 
    p = ppe.submit(f1, group) 
    futures.append(p) 

for future in as_completed(futures): 
    r = future.result() 
    results.append(r) 

df_output = pd.concat(results) 
del ppe 

CPU times: user 18.8 s, sys: 2.15 s, total: 21 s 
Wall time: 17.9 s 

Давайте теперь добавим столбец, который разбивающий ФР в гораздо меньше групп:

df['b'] = np.random.randint(0, 12, nrows) 

Теперь вместо 15000 групп есть составляют только 12:

len(df.b.unique()) 
12 

Мы разделим наш df и выполним групповое применение на каждом фрагменте.

ppe = ProcessPoolExecutor(12) 

Упаковочные весело:

def f2(df): 
    df.groupby('a').apply(f1) 
    return df 

Отсылает каждый кусок должен работать на в последовательном:

%%time 

for i in df.b.unique(): 
    p = ppe.submit(f2, df[df.b==i]) 
    futures.append(p) 

for future in as_completed(futures): 
    r = future.result() 
    results.append(r) 

df_output = pd.concat(results) 

CPU times: user 11.4 s, sys: 176 ms, total: 11.5 s 
Wall time: 12.4 s 

Обратите внимание, что количество времени тратит на группу не изменилось. Скорее, что изменилось, это длина очереди, из которой считаются рабочие. Я подозреваю, что происходит то, что рабочие не могут одновременно обращаться к разделяемой памяти и постоянно возвращаются к чтению очереди, и поэтому наступают друг на друга. С более крупными кусками, чтобы работать, рабочие возвращаются реже, и поэтому эта проблема улучшается, а общее выполнение выполняется быстрее.