2016-08-17 6 views
0

Я хочу использовать питон мультипроцессирование провести следующее:Python многопроцессорные: процесс список входных файлов с регистрацией ошибок

  • процесса длинный список входных файлы
  • включают протоколирование ошибок
  • множества А (количество процессов)

python logging cookbook имеет два отличных примера для многопроцессорной обработки. В приведенном ниже коде я изменил второй метод («вход в основной процесс, в отдельный поток»), который использует многопроцессорность. Queue. Как для себя, так и для новых пользователей, я добавил подробные заметки и создал пример ввода и вывода файлов.

Где я застрял в том, что код выполняет итерацию по количеству ядер процессора, а не по количеству элементов в моем списке.

Как я могу применить функцию ко всем моим входным файлам, не превышая лимит количества одновременных процессов?

import json 
import logging 
import multiprocessing 
import numpy as np 
import os 
import pandas as pd 
import threading 
import time 

def create_10_infiles(): 
    """Creates 10 csv files with 4x4 array of floats, + occasional strings""" 
    list_csv_in = [] 
    for i in range(1,11): 
     csv_in = "{:02d}_in.csv".format(i) 
     # create a 4 row, 4 column dataframe with random values centered around i 
     df = pd.DataFrame(np.random.rand(16).reshape(4,4) * i) 
     # add a string to one of the arrays (as a reason to need error logging) 
     if i == 2 or i == 8: 
      df.loc[2,2] = "Oops, array contains a string. Welcome to data science." 
     # save to csv, and append filename to list of inputfiles 
     df.to_csv(csv_in) 
     list_csv_in.append(csv_in) 
    return list_csv_in 

def logger_thread(queue): 
    """Listener process that logs output received from other processes?""" 
    while True: 
     record = queue.get() 
     if record is None: 
      break 
     logger = logging.getLogger(record.name) 
     logger.handle(record) 

def worker_process(queue, infile): 
    """Worker process that used to run tasks. 
    Each process is isolated, so it starts by setting up logging.""" 
    # set up a handle to hold the logger output? 
    queue_handle = logging.handlers.QueueHandler(queue) 
    # creates a new logger called "process logger" (printed in each line) 
    logger = logging.getLogger("process logger") 
    # sets the logging level to DEBUG, so logger.info messages are printed. 
    logger.setLevel(logging.DEBUG) 
    # connects logger to handle defined above? 
    logger.addHandler(queue_handle) 
    # here you can run your desired program, in the hope that the time saved from parallel 
    # processing is greater than the overhead of setting up all those processes and loggers:) 
    normalise_array_to_mean_and_save(infile, logger) 

def normalise_array_to_mean_and_save(csv_in, logger): 
    """Opens csv with array, checks dtypes, calculates mean, saves output csv.""" 
    # check if file exists 
    if os.path.isfile(csv_in): 
     # open as pandas dataframe 
     df = pd.read_csv(csv_in) 
     # if none of the columns contain mixed datatypes (i.e, a string) 
     if not pd.np.dtype('object') in df.dtypes.tolist(): 
      # calc mean over whole dataframe 
      mean = df.stack().mean() 
      logger.info("{}, Mean = {:0.2f}".format(csv_in, mean)) 
      # normalise all values to mean. Save as "01_out.csv", "02_out.csv" etc 
      df = df/mean 
      csv_out = csv_in[:-6] + "out.csv" 
      df.to_csv(csv_out) 
     else: 
      logger.info("{}, Mean not calculated. Non-float values found.".format(csv_in)) 

if __name__ == '__main__': 
    os.chdir(r"D:\data") 
    # import your favourite json logging settings (collapsed for brevity) 
    logsettings = json.dumps({"version": 1, "root": {"handlers": ["console", "file"], "level": "DEBUG"}, "formatters": {"detailed": {"class": "logging.Formatter", "format": "%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s"}}, "handlers": {"console": {"class": "logging.StreamHandler", "level": "DEBUG"}, "file": {"mode": "w", "formatter": "detailed", "class": "logging.FileHandler", "filename": "my_multiprocessing_logfile.log"}}}) 
    config = json.loads(logsettings) 
    # replace default logfile with a filename containing the exact time 
    config['handlers']['file']['filename'] = time.strftime("%Y%m%d_%H_%M_%S") + "_mp_logfile.txt" 
    # load the logging settings 
    logging.config.dictConfig(config) 

    queue = multiprocessing.Queue() 
    workers = [] 
    # set the number of concurrent processes created (i.e. CPU cores used) 
    num_processes = 4 

    # create 10 csv files with data, and return the list of filepaths 
    list_10_infiles = create_10_infiles() 

    # set up a process for each CPU core (e.g. 4) 
    for i in range(num_processes): 
     wp = multiprocessing.Process(target=worker_process, 
            name='worker_{}'.format(i+1), 
            args=(queue, list_10_infiles[i])) 
     workers.append(wp) 
     wp.start() 

    # set up a thread as the logger_process 
    logger_process = threading.Thread(target=logger_thread, args=(queue,)) 
    logger_process.start() 

    #At this point, the main process could do some useful work of its own 
    #Once it's done that, it can wait for the workers to terminate... 
    for wp in workers: 
     wp.join() 

    # set logger for main process if desired 
    root = logging.getLogger("main") 
    root.setLevel(logging.DEBUG) 
    logger = logging.getLogger("main logger") 
    logger.info("CPUs used = {}/{}".format(num_processes, multiprocessing.cpu_count())) 
    logger.info('Program is finished. All files analysed.') 

    # And now tell the logging thread to finish up, too 
    queue.put(None) 
    logger_process.join() 

Примечание: Я попытался делить список входных файлов на куски в зависимости от количества ядер ЦП. Это обработало файлы, но было очень медленным.

ответ

0

Я обнаружил, что использование пула многопроцессорности python вместо очереди позволяет обрабатывать длинный список файлов и ограничивать количество параллельных ядер.

Хотя ведение журнала несовместимо с пулом, я обнаружил, что можно получить возвращаемые значения. Возвращаемые значения могут регистрироваться после обработки всех файлов, если код не генерирует исключение.

Возможно, кто-то здесь может дать мне более элегантное решение, но на данный момент это решает проблему.

from multiprocessing import Pool 
from time import strftime 
import logging 

def function_to_process_files(file): 
    #..check file integrity, etc.. 
    if file_gives_an_error: 
     return "{} file {} gave an error".format(strftime("%Y%m%d_%H_%M_%S"), file) 
    #..do stuff without using the logging module.. 
    #.. for slow, irregular processes, printing to console is possible.. 
    return "{} file {} processed correctly".format(strftime("%Y%m%d_%H_%M_%S"), file) 

if __name__ == "__main__": 

    list_of_files_to_process = define_your_file_list_somehow() 

    logging = logging.setup_regular_logging_to_file_as_desired() 

    # define the number of CPU cores to be used concurrently 
    n_processes = 4 

    with Pool(processes=n_processes) as pool: 
     list_of_return_statements = pool.map(function_to_process_files, list_of_files_to_process) 
    # now transfer the list of return statements to the logfile 
    for return_statement in list_of_return_statements: 
     logging.info(return_statement) 

 Смежные вопросы

  • Нет связанных вопросов^_^