Благодарим за предоставление рабочего кода. Я изменил это, чтобы получить некоторое представление, а затем создал модифицированную версию с использованием многопроцессорной обработки.
Модифицированный потоковая версия
Все модификации просто, чтобы получить больше информации из, нет концептуальных изменений. Все идет в один файл mthread.py
и прокомментирован частично.
Импорт, как обычно:
import pandas as pd
import numpy as np
from timeit import default_timer as timer
import datetime
import random
import threading
import logging
write_samples
получил некоторую регистрацию:
def write_samples(store, samples, overwrite):
wslog = logging.getLogger("write_samples")
wslog.info("starting")
frame = pd.DataFrame(samples, dtype='float64')
if overwrite:
store.put("df", frame, format='table', index=False)
else:
store.append("df", frame, format='table', index=False)
wslog.info("finished")
begin_io
получил максимальную duaration, превысив, что результаты времени в записи журнала ПРЕДУПРЕЖДЕНИЕ:
def begin_io(maxduration=500):
iolog = logging.getLogger("begin_io")
iolog.info("starting")
try:
fname = "data/tab" + str(random.randint(0, 100)) + ".h5"
iolog.debug("opening store %s", fname)
with pd.HDFStore(fname, mode='w', complevel=0) as store:
iolog.debug("store %s open", fname)
counter = 0
while True:
data = np.random.rand(50000, 1)
start_time = timer()
write_samples(store, data, counter == 0)
end_time = timer()
duration = (end_time - start_time) * 1000
iolog.debug("IO Done : %s (%.2f ms, %d)",
datetime.datetime.now(),
duration,
counter)
if duration > maxduration:
iolog.warning("Long duration %s", duration)
counter += 1
except Exception:
iolog.exception("oops")
finally:
iolog.info("finished")
dummy_thread
получили изменен, чтобы d также испускает ПРЕДУПРЕЖДЕНИЕ, если занимает слишком много времени:
def dummy_thread(pill2kill, maxduration=500):
dtlog = logging.getLogger("dummy_thread")
dtlog.info("starting")
try:
previous = timer()
while not pill2kill.wait(0.01):
now = timer()
duration = (now - previous) * 1000
dtlog.info("Dummy Thread : %s (%d ms)",
datetime.datetime.now(),
duration)
if duration > maxduration:
dtlog.warning("Long duration %s", duration)
previous = now
dtlog.debug("stopped looping.")
except Exception:
dtlog.exception("oops")
finally:
dtlog.info("finished")
и, наконец, мы все называем. Не стесняйтесь изменять уровни журналов, WARNING
показывает только чрезмерное время, INFO
и DEBUG
рассказывают гораздо больше.
if __name__ == '__main__':
logformat = '%(asctime)-15s [%(levelname)s] - %(name)s: %(message)s'
logging.basicConfig(format=logformat,
level=logging.WARNING)
pill2kill = threading.Event()
t = threading.Thread(target=dummy_thread, args=(pill2kill, 500))
t.start()
try:
begin_io(500)
finally:
pill2kill.set()
t.join()
Выполнение кода я получаю результаты, как вы описали:
2016-04-08 15:29:11,428 [WARNING] - begin_io: Long duration 5169.03591156
2016-04-08 15:29:11,429 [WARNING] - dummy_thread: Long duration 5161.45706177
2016-04-08 15:29:27,305 [WARNING] - begin_io: Long duration 1447.40581512
2016-04-08 15:29:27,306 [WARNING] - dummy_thread: Long duration 1450.75201988
2016-04-08 15:29:32,893 [WARNING] - begin_io: Long duration 1610.98194122
2016-04-08 15:29:32,894 [WARNING] - dummy_thread: Long duration 1612.98394203
2016-04-08 15:29:34,930 [WARNING] - begin_io: Long duration 823.182821274
2016-04-08 15:29:34,930 [WARNING] - dummy_thread: Long duration 815.275907516
2016-04-08 15:29:43,640 [WARNING] - begin_io: Long duration 510.369062424
2016-04-08 15:29:43,640 [WARNING] - dummy_thread: Long duration 511.776924133
Из значений понятно, что в то время как begin_io
очень занят и delayd (вероятно, во время данных записываются на диск) , dummy_thread
также задерживается почти на такое же количество времени.
Версия с многопроцессорной - работает хорошо
Я изменил код для запуска в различных процессах и с тех пор он действительно не блокирует dummy_thread
.
2016-04-08 15:38:12,487 [WARNING] - begin_io: Long duration 755.397796631
2016-04-08 15:38:14,127 [WARNING] - begin_io: Long duration 1434.60512161
2016-04-08 15:38:15,725 [WARNING] - begin_io: Long duration 848.396062851
2016-04-08 15:38:24,290 [WARNING] - begin_io: Long duration 1129.17089462
2016-04-08 15:38:25,609 [WARNING] - begin_io: Long duration 1059.08918381
2016-04-08 15:38:31,165 [WARNING] - begin_io: Long duration 646.969079971
2016-04-08 15:38:37,273 [WARNING] - begin_io: Long duration 1699.17201996
2016-04-08 15:38:43,788 [WARNING] - begin_io: Long duration 1555.341959
2016-04-08 15:38:47,765 [WARNING] - begin_io: Long duration 639.196872711
2016-04-08 15:38:54,269 [WARNING] - begin_io: Long duration 1690.57011604
2016-04-08 15:39:06,397 [WARNING] - begin_io: Long duration 1998.33416939
2016-04-08 15:39:16,980 [WARNING] - begin_io: Long duration 2558.51006508
2016-04-08 15:39:21,688 [WARNING] - begin_io: Long duration 1132.73501396
2016-04-08 15:39:26,450 [WARNING] - begin_io: Long duration 876.784801483
2016-04-08 15:39:29,809 [WARNING] - begin_io: Long duration 709.135055542
2016-04-08 15:39:31,748 [WARNING] - begin_io: Long duration 677.506923676
2016-04-08 15:39:41,854 [WARNING] - begin_io: Long duration 770.184993744
код с многопроцессорной здесь:
import pandas as pd
import numpy as np
from timeit import default_timer as timer
import datetime
import random
import multiprocessing
import time
import logging
def write_samples(store, samples, overwrite):
wslog = logging.getLogger("write_samples")
wslog.info("starting")
frame = pd.DataFrame(samples, dtype='float64')
if overwrite:
store.put("df", frame, format='table', index=False)
else:
store.append("df", frame, format='table', index=False)
wslog.info("finished")
def begin_io(pill2kill, maxduration=500):
iolog = logging.getLogger("begin_io")
iolog.info("starting")
try:
fname = "data/tab" + str(random.randint(0, 100)) + ".h5"
iolog.debug("opening store %s", fname)
with pd.HDFStore(fname, mode='w', complevel=0) as store:
iolog.debug("store %s open", fname)
counter = 0
while not pill2kill.wait(0):
data = np.random.rand(50000, 1)
start_time = timer()
write_samples(store, data, counter == 0)
end_time = timer()
duration = (end_time - start_time) * 1000
iolog.debug("IO Done : %s (%.2f ms, %d)",
datetime.datetime.now(),
duration,
counter)
if duration > maxduration:
iolog.warning("Long duration %s", duration)
counter += 1
except Exception:
iolog.exception("oops")
finally:
iolog.info("finished")
def dummy_thread(pill2kill, maxduration=500):
dtlog = logging.getLogger("dummy_thread")
dtlog.info("starting")
try:
previous = timer()
while not pill2kill.wait(0.01):
now = timer()
duration = (now - previous) * 1000
dtlog.info("Dummy Thread : %s (%d ms)",
datetime.datetime.now(),
duration)
if duration > maxduration:
dtlog.warning("Long duration %s", duration)
previous = now
dtlog.debug("stopped looping.")
except Exception:
dtlog.exception("oops")
finally:
dtlog.info("finished")
if __name__ == '__main__':
logformat = '%(asctime)-15s [%(levelname)s] - %(name)s: %(message)s'
logging.basicConfig(format=logformat,
level=logging.WARNING)
pill2kill = multiprocessing.Event()
dp = multiprocessing.Process(target=dummy_thread, args=(pill2kill, 500,))
dp.start()
try:
p = multiprocessing.Process(target=begin_io, args=(pill2kill, 500,))
p.start()
time.sleep(100)
finally:
pill2kill.set()
dp.join()
p.join()
Выводы
записи данных в файл HDF5 действительно блокирует другие потоки и многопроцессорная версия требуется.
Если вы ожидаете, что dummy_thread
сделать некоторую реальную работу (например, сбор данных для хранения), и вы хотите отправить данные здесь сериализатором HDF5, вы должны какой-то обмен сообщениями - либо с помощью multiprocessing.Queue
, Pipe
или, возможно, использовать ZeroMQ (например, PUSH - PULL гнездо ). С ZeroMQ вы можете выполнять сохранение данных даже на другом компьютере.
РЕДАКТИРОВАТЬ/ПРЕДУПРЕЖДЕНИЕ: Предоставленный код может сэкономить данные некогда, я сделал это для измерения производительности и не сделал его водонепроницаемым. Когда Ctrl-C во время обработки, иногда я получаю поврежденный файл. Эта проблема я рассматриваю вне сферы этого вопроса (и проблема должна быть решена путем тщательной остановки текущего процесса).
Любой код? Как насчет того, чтобы сузить проблему до короткого простого скрипта, который мог бы написать ожидаемое количество некоторых фиктивных данных. Вы увидите, если он все еще страдает от одной и той же проблемы или работает хорошо. –
@JanVlcinsky Я воспроизвел его в сценарии, который просто постоянно добавляется к HDFStore. Я упрощу это, а затем опубликую здесь. – user3870920
@JanVlcinsky Добавлен код-пример – user3870920