Я хочу использовать питон мультипроцессирование провести следующее:Python многопроцессорные: процесс список входных файлов с регистрацией ошибок
- процесса длинный список входных файлы
- включают протоколирование ошибок
- множества А (количество процессов)
python logging cookbook имеет два отличных примера для многопроцессорной обработки. В приведенном ниже коде я изменил второй метод («вход в основной процесс, в отдельный поток»), который использует многопроцессорность. Queue. Как для себя, так и для новых пользователей, я добавил подробные заметки и создал пример ввода и вывода файлов.
Где я застрял в том, что код выполняет итерацию по количеству ядер процессора, а не по количеству элементов в моем списке.
Как я могу применить функцию ко всем моим входным файлам, не превышая лимит количества одновременных процессов?
import json
import logging
import multiprocessing
import numpy as np
import os
import pandas as pd
import threading
import time
def create_10_infiles():
"""Creates 10 csv files with 4x4 array of floats, + occasional strings"""
list_csv_in = []
for i in range(1,11):
csv_in = "{:02d}_in.csv".format(i)
# create a 4 row, 4 column dataframe with random values centered around i
df = pd.DataFrame(np.random.rand(16).reshape(4,4) * i)
# add a string to one of the arrays (as a reason to need error logging)
if i == 2 or i == 8:
df.loc[2,2] = "Oops, array contains a string. Welcome to data science."
# save to csv, and append filename to list of inputfiles
df.to_csv(csv_in)
list_csv_in.append(csv_in)
return list_csv_in
def logger_thread(queue):
"""Listener process that logs output received from other processes?"""
while True:
record = queue.get()
if record is None:
break
logger = logging.getLogger(record.name)
logger.handle(record)
def worker_process(queue, infile):
"""Worker process that used to run tasks.
Each process is isolated, so it starts by setting up logging."""
# set up a handle to hold the logger output?
queue_handle = logging.handlers.QueueHandler(queue)
# creates a new logger called "process logger" (printed in each line)
logger = logging.getLogger("process logger")
# sets the logging level to DEBUG, so logger.info messages are printed.
logger.setLevel(logging.DEBUG)
# connects logger to handle defined above?
logger.addHandler(queue_handle)
# here you can run your desired program, in the hope that the time saved from parallel
# processing is greater than the overhead of setting up all those processes and loggers:)
normalise_array_to_mean_and_save(infile, logger)
def normalise_array_to_mean_and_save(csv_in, logger):
"""Opens csv with array, checks dtypes, calculates mean, saves output csv."""
# check if file exists
if os.path.isfile(csv_in):
# open as pandas dataframe
df = pd.read_csv(csv_in)
# if none of the columns contain mixed datatypes (i.e, a string)
if not pd.np.dtype('object') in df.dtypes.tolist():
# calc mean over whole dataframe
mean = df.stack().mean()
logger.info("{}, Mean = {:0.2f}".format(csv_in, mean))
# normalise all values to mean. Save as "01_out.csv", "02_out.csv" etc
df = df/mean
csv_out = csv_in[:-6] + "out.csv"
df.to_csv(csv_out)
else:
logger.info("{}, Mean not calculated. Non-float values found.".format(csv_in))
if __name__ == '__main__':
os.chdir(r"D:\data")
# import your favourite json logging settings (collapsed for brevity)
logsettings = json.dumps({"version": 1, "root": {"handlers": ["console", "file"], "level": "DEBUG"}, "formatters": {"detailed": {"class": "logging.Formatter", "format": "%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s"}}, "handlers": {"console": {"class": "logging.StreamHandler", "level": "DEBUG"}, "file": {"mode": "w", "formatter": "detailed", "class": "logging.FileHandler", "filename": "my_multiprocessing_logfile.log"}}})
config = json.loads(logsettings)
# replace default logfile with a filename containing the exact time
config['handlers']['file']['filename'] = time.strftime("%Y%m%d_%H_%M_%S") + "_mp_logfile.txt"
# load the logging settings
logging.config.dictConfig(config)
queue = multiprocessing.Queue()
workers = []
# set the number of concurrent processes created (i.e. CPU cores used)
num_processes = 4
# create 10 csv files with data, and return the list of filepaths
list_10_infiles = create_10_infiles()
# set up a process for each CPU core (e.g. 4)
for i in range(num_processes):
wp = multiprocessing.Process(target=worker_process,
name='worker_{}'.format(i+1),
args=(queue, list_10_infiles[i]))
workers.append(wp)
wp.start()
# set up a thread as the logger_process
logger_process = threading.Thread(target=logger_thread, args=(queue,))
logger_process.start()
#At this point, the main process could do some useful work of its own
#Once it's done that, it can wait for the workers to terminate...
for wp in workers:
wp.join()
# set logger for main process if desired
root = logging.getLogger("main")
root.setLevel(logging.DEBUG)
logger = logging.getLogger("main logger")
logger.info("CPUs used = {}/{}".format(num_processes, multiprocessing.cpu_count()))
logger.info('Program is finished. All files analysed.')
# And now tell the logging thread to finish up, too
queue.put(None)
logger_process.join()
Примечание: Я попытался делить список входных файлов на куски в зависимости от количества ядер ЦП. Это обработало файлы, но было очень медленным.