Короткий комментарий, чтобы сопровождать ответ JD Long. Я обнаружил, что если количество групп очень велико (скажем, сотни тысяч), и ваша прикладная функция делает что-то довольно простое и быстрое, а затем разбивает ваш блок данных на куски и назначает каждому куску работнику для выполнения groupby-apply (в последовательном порядке) может быть намного быстрее, чем выполнять параллельную групповую подачу заявки, а рабочие считывают очередь, содержащую множество групп. Пример:
import pandas as pd
import numpy as np
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
nrows = 15000
np.random.seed(1980)
df = pd.DataFrame({'a': np.random.permutation(np.arange(nrows))})
Таким образом, наш dataframe выглядит следующим образом:
a
0 3425
1 1016
2 8141
3 9263
4 8018
Обратите внимание, что колонки 'а' имеет много групп (думают, идентификаторы клиента):
len(df.a.unique())
15000
Функции для работы на нашем группы:
def f1(group):
time.sleep(0.0001)
return group
Начало действия оол:
ppe = ProcessPoolExecutor(12)
futures = []
results = []
ли параллельный GroupBy применить:
%%time
for name, group in df.groupby('a'):
p = ppe.submit(f1, group)
futures.append(p)
for future in as_completed(futures):
r = future.result()
results.append(r)
df_output = pd.concat(results)
del ppe
CPU times: user 18.8 s, sys: 2.15 s, total: 21 s
Wall time: 17.9 s
Давайте теперь добавим столбец, который разбивающий ФР в гораздо меньше групп:
df['b'] = np.random.randint(0, 12, nrows)
Теперь вместо 15000 групп есть составляют только 12:
len(df.b.unique())
12
Мы разделим наш df и выполним групповое применение на каждом фрагменте.
ppe = ProcessPoolExecutor(12)
Упаковочные весело:
def f2(df):
df.groupby('a').apply(f1)
return df
Отсылает каждый кусок должен работать на в последовательном:
%%time
for i in df.b.unique():
p = ppe.submit(f2, df[df.b==i])
futures.append(p)
for future in as_completed(futures):
r = future.result()
results.append(r)
df_output = pd.concat(results)
CPU times: user 11.4 s, sys: 176 ms, total: 11.5 s
Wall time: 12.4 s
Обратите внимание, что количество времени тратит на группу не изменилось. Скорее, что изменилось, это длина очереди, из которой считаются рабочие. Я подозреваю, что происходит то, что рабочие не могут одновременно обращаться к разделяемой памяти и постоянно возвращаются к чтению очереди, и поэтому наступают друг на друга. С более крупными кусками, чтобы работать, рабочие возвращаются реже, и поэтому эта проблема улучшается, а общее выполнение выполняется быстрее.
Знаете ли вы, был ли прогресс в включении распараллеливания в панды? – NumenorForLife
Делая небольшие изменения в функции может быть сделано, чтобы вернуть иерархический индекс, что регулярные применить возвращает: определение функции temp_func (FUNC, имя, группа): возвращение FUNC (группа), имя Защиту applyParallel (dfGrouped , func): retLst, top_index = zip (* Параллель (n_jobs = multiprocessing.cpu_count()) (delayed (temp_func) (func, name, group) для имени, группы в dfGrouped)) return pd.concat (retLst, keys = top_index) ' Dang, я не могу понять, как отправлять комментарии в комментариях ... – BoZenKhaa
@ jsc123: есть [dask] (https://github.com/blaze/dask) – paulochf