У меня есть код python, который использует многопроцессорную карту пула. Я роняю нескольких детей с карты, каждый из них читает отдельный файл, и я собираю их в конце. Моя цель состоит в том, чтобы в конечном итоге был создан кадр данных pandas, который представляет собой конкатенацию всего выходного файла из детей с дублированием. Я использую этот dataframe для большей обработки (остальная часть кода кажется не связанной с вопросом, который я задаю здесь, поэтому я просто не использую эту часть для краткости). Этот код периодически запускается в конце недели, когда новые входные файлы читаются каждый раз. Иногда есть ошибки в файлах, которые читают дети, например, нулевые значения в целых столбцах или отсутствующие файлы и т. Д. Если какая-либо из этих ошибок возникает, я хочу, чтобы основной скрипт умирал, в идеале, как можно скорее. Я не знаю, как сделать это самым эффективным способом.Ловля ошибок в MultiProcessing Pool Map
Я попытался, в свою очередь: 1-Приведение ребенка к смерти при подъеме SystemExit (1), если оно встретит ошибку. Я не мог заставить родителя умереть. 2-Создание дочернего элемента возвращает пустое значение или фрейм данных pandas в случае ошибки, попробуйте исключить блоки. Я не мог обнаружить его правильно в родительском. 3-Использование map_async с функциями обратного вызова вместо карты.
Последний, похоже, работает. Однако я не уверен, что это правильный и наиболее эффективный способ сделать это, поскольку я не использую никакого вывода из функции обратного вызова ошибки. Любые комментарии и предложения приветствуются.
Edit:
Пример входного файла: a.txt:
shipmentId,processing_time_epoch
4001,1455408024132
4231,1455408024373
b.txt:
shipmentId,processing_time_epoch
5001,1455408024132
4231,1455408024373
Желаемая окончательный PROCESSING_TIME панды dataframe:
shipmentId,processing_time_epoch
4001,1455408024132
4231,1455408024373
5001,1455408024132
Мои код:
import pandas as pd
import csv,glob,datetime,sys,pdb,subprocess,multiprocessing,io,os,shlex
from itertools import repeat
def myerrorcallback(x):
print('There seems to be an error in the child. Parent: Please die.')
return
def mycallback(x):
print('Returned successfully.')
return
def PrintException():
exc_type, exc_obj, tb = sys.exc_info()
f = tb.tb_frame
lineno = tb.tb_lineno
filename = f.f_code.co_filename
print('EXCEPTION IN ({}, LINE {}): {} ({})'.format(filename, lineno, exc_obj, exc_type))
return
# ===================================================================
def Read_Processing_Times_v1(full_path_name):
try:
df = pd.read_csv(full_path_name,dtype={'shipmentId': pd.np.int64, 'processing_time_epoch': pd.np.int64}, usecols=['shipmentId','processing_time_epoch'])
return df.drop_duplicates()
except:
print("exception in file "+full_path_name)
PrintException()
raise(SystemExit(1))
# ===================================================================
def Read_Processing_Times_v2(full_path_name):
try:
df = pd.read_csv(full_path_name,dtype={'shipmentId': pd.np.int64, 'processing_time_epoch': pd.np.int64}, usecols=['shipmentId','processing_time_epoch'])
return df.drop_duplicates()
except:
print("exception in file "+full_path_name)
PrintException()
return pd.DataFrame()
# ===================================================================
def Read_Processing_Times_v3(full_path_name):
df = pd.read_csv(full_path_name,dtype={'shipmentId': pd.np.int64,'processing_time_epoch': pd.np.int64}, usecols=['shipmentId','processing_time_epoch'])
return df.drop_duplicates()
# ===========================================================================================================================
# Top-level
if __name__ == '__main__':
mycols = ['shipmentId', 'processing_time_epoch']
mydtypes = {'shipmentId': pd.np.int64, 'processing_time_epoch': pd.np.int64}
# The following two files should not give an error:
# files_to_read=["a.txt","b.txt"]
# The following two files should give an error, as a2.txt does not exist:
files_to_read=["a2.txt","b.txt"]
# version 1: Works with the correct files. Does not work if one of the children has an error: the child dies, the parent does not and waits forever.
# print("version 1")
# pool = multiprocessing.Pool(15)
# processing_times = pool.map(Read_Processing_Times_v1, files_to_read)
# pool.close()
# pool.join()
# processing_times = pd.concat(processing_times,ignore_index=True).drop_duplicates()
# print(processing_times)
# version 2: Does not work. Don't know how to fix it. The idea is make child return something, and catch the error in the parent.
# print("version 2")
# pool = multiprocessing.Pool(15)
# processing_times = pool.map(Read_Processing_Times_v2, files_to_read)
# pool.close()
# pool.join()
# if(processing_times.count(pd.DataFrame()) > 0):
# print("SLAM times are not read properly.")
# raise SystemExit(1)
# version 3:
print("version 3")
pool = multiprocessing.Pool(15)
processing_times = pool.map_async(Read_Processing_Times_v3, files_to_read,callback=mycallback,error_callback=myerrorcallback)
pool.close()
pool.join()
processing_times = processing_times.get()
processing_times = pd.concat(processing_times,ignore_index=True).drop_duplicates()
print("success!")
# Do more processing with processing_times after this line...
Кстати, мой питон версия 3.3. – ozzy
что вы хотите сделать точно? пожалуйста, объясните свой ожидаемый результат –
отредактировал! Дайте мне знать, если теперь будет более ясно – ozzy