2016-02-15 3 views
1

У меня есть код python, который использует многопроцессорную карту пула. Я роняю нескольких детей с карты, каждый из них читает отдельный файл, и я собираю их в конце. Моя цель состоит в том, чтобы в конечном итоге был создан кадр данных pandas, который представляет собой конкатенацию всего выходного файла из детей с дублированием. Я использую этот dataframe для большей обработки (остальная часть кода кажется не связанной с вопросом, который я задаю здесь, поэтому я просто не использую эту часть для краткости). Этот код периодически запускается в конце недели, когда новые входные файлы читаются каждый раз. Иногда есть ошибки в файлах, которые читают дети, например, нулевые значения в целых столбцах или отсутствующие файлы и т. Д. Если какая-либо из этих ошибок возникает, я хочу, чтобы основной скрипт умирал, в идеале, как можно скорее. Я не знаю, как сделать это самым эффективным способом.Ловля ошибок в MultiProcessing Pool Map

Я попытался, в свою очередь: 1-Приведение ребенка к смерти при подъеме SystemExit (1), если оно встретит ошибку. Я не мог заставить родителя умереть. 2-Создание дочернего элемента возвращает пустое значение или фрейм данных pandas в случае ошибки, попробуйте исключить блоки. Я не мог обнаружить его правильно в родительском. 3-Использование map_async с функциями обратного вызова вместо карты.

Последний, похоже, работает. Однако я не уверен, что это правильный и наиболее эффективный способ сделать это, поскольку я не использую никакого вывода из функции обратного вызова ошибки. Любые комментарии и предложения приветствуются.

Edit:

Пример входного файла: a.txt:

shipmentId,processing_time_epoch 
4001,1455408024132 
4231,1455408024373 

b.txt:

shipmentId,processing_time_epoch 
5001,1455408024132 
4231,1455408024373 

Желаемая окончательный PROCESSING_TIME панды dataframe:

shipmentId,processing_time_epoch 
4001,1455408024132 
4231,1455408024373 
5001,1455408024132 

Мои код:

import pandas as pd 
import csv,glob,datetime,sys,pdb,subprocess,multiprocessing,io,os,shlex 
from itertools import repeat 

def myerrorcallback(x): 
    print('There seems to be an error in the child. Parent: Please die.') 
    return 

def mycallback(x): 
    print('Returned successfully.') 
    return 

def PrintException(): 
    exc_type, exc_obj, tb = sys.exc_info() 
    f = tb.tb_frame 
    lineno = tb.tb_lineno 
    filename = f.f_code.co_filename 
    print('EXCEPTION IN ({}, LINE {}): {} ({})'.format(filename, lineno, exc_obj, exc_type)) 
    return 

# =================================================================== 
def Read_Processing_Times_v1(full_path_name): 
    try: 
     df = pd.read_csv(full_path_name,dtype={'shipmentId': pd.np.int64, 'processing_time_epoch': pd.np.int64}, usecols=['shipmentId','processing_time_epoch']) 
     return df.drop_duplicates() 
    except: 
     print("exception in file "+full_path_name) 
     PrintException() 
     raise(SystemExit(1)) 

# =================================================================== 
def Read_Processing_Times_v2(full_path_name): 
    try: 
     df = pd.read_csv(full_path_name,dtype={'shipmentId': pd.np.int64, 'processing_time_epoch': pd.np.int64}, usecols=['shipmentId','processing_time_epoch']) 
     return df.drop_duplicates() 
    except: 
     print("exception in file "+full_path_name) 
     PrintException() 
    return pd.DataFrame() 


# =================================================================== 
def Read_Processing_Times_v3(full_path_name): 
    df = pd.read_csv(full_path_name,dtype={'shipmentId': pd.np.int64,'processing_time_epoch': pd.np.int64}, usecols=['shipmentId','processing_time_epoch']) 
    return df.drop_duplicates() 

# =========================================================================================================================== 
# Top-level 
if __name__ == '__main__': 

    mycols = ['shipmentId', 'processing_time_epoch'] 
    mydtypes = {'shipmentId': pd.np.int64, 'processing_time_epoch': pd.np.int64} 

    # The following two files should not give an error: 
    # files_to_read=["a.txt","b.txt"] 

    # The following two files should give an error, as a2.txt does not exist: 
    files_to_read=["a2.txt","b.txt"] 

    # version 1: Works with the correct files. Does not work if one of the children has an error: the child dies, the parent does not and waits forever. 
    # print("version 1") 
    # pool = multiprocessing.Pool(15) 
    # processing_times = pool.map(Read_Processing_Times_v1, files_to_read) 
    # pool.close() 
    # pool.join() 
    # processing_times = pd.concat(processing_times,ignore_index=True).drop_duplicates() 
    # print(processing_times) 


    # version 2: Does not work. Don't know how to fix it. The idea is make child return something, and catch the error in the parent. 
    # print("version 2") 
    # pool = multiprocessing.Pool(15) 
    # processing_times = pool.map(Read_Processing_Times_v2, files_to_read) 
    # pool.close() 
    # pool.join() 
    # if(processing_times.count(pd.DataFrame()) > 0): 
    #  print("SLAM times are not read properly.") 
    #  raise SystemExit(1) 

    # version 3: 
    print("version 3") 
    pool = multiprocessing.Pool(15) 
    processing_times = pool.map_async(Read_Processing_Times_v3, files_to_read,callback=mycallback,error_callback=myerrorcallback) 
    pool.close() 
    pool.join() 
    processing_times = processing_times.get() 
    processing_times = pd.concat(processing_times,ignore_index=True).drop_duplicates() 


    print("success!") 

    # Do more processing with processing_times after this line... 
+0

Кстати, мой питон версия 3.3. – ozzy

+0

что вы хотите сделать точно? пожалуйста, объясните свой ожидаемый результат –

+0

отредактировал! Дайте мне знать, если теперь будет более ясно – ozzy

ответ

0

Я думаю, что вы могли бы выполнить то, что хотите, используя модуль concurrent.futures (https://docs.python.org/3/library/concurrent.futures.html). Ниже приведен пример страницы документа, который я модифицировал, чтобы быть ближе к вашей проблеме. В примере, если work_func возвращает False, что считается ошибкой, и программа завершится.

import sys 
import concurrent.futures 
import random 
import time 


def work_func(input_val): 
    """ 
    Do some work. Here a False value would mean there is an error 
    """ 
    time.sleep(0.5) 
    return random.choice([True, True, True, True, False]) 


if __name__ == "__main__": 
    # We can use a with statement to ensure processes are cleaned up promptly 
    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor: 
     # Start the load operations and mark each future with its input value 
     future_to_result = {executor.submit(work_func, val): val for val in range(30)} 

     # iterate over the futures as they become available 
     for future in concurrent.futures.as_completed(future_to_result): 

      # get the input value from the dict 
      input_val = future_to_result[future] 

      # now retrieve the result from the future 
      try: 
       data = future.result() 
      except Exception as exc: 
       print(input_val, data) 
       print('Something exceptional happend') 
      else: 
       print(input_val, data) 
       if not data: 
        print('Error - exiting') 
        sys.exit(1) 

Пример вывода:

0 True 
1 True 
2 True 
3 False 
Error - exiting