2016-04-07 5 views
3

У меня есть приложение выборки, который получает 250,000 выборок в секунду, буферизует их в памяти и в конечном итоге присоединяет к HDFStore, предоставленной pandas. В общем, это здорово. Тем не менее, у меня есть поток, который запускает и постоянно опорожняет устройство сбора данных (DAQ), и он должен работать на регулярной основе. Отклонение около секунды имеет тенденцию нарушать вещи. Ниже приведен крайний случай наблюдаемых таймингов. Start указывает на начало DAQ, Finish - это когда он заканчивается, и IO указывает запись HDF (оба DAQ и IO встречаются в отдельных потоках).ГИЛ для IO ограничена нить в расширении C (HDF5)

Start  : 2016-04-07 12:28:22.241303 
IO (1)  : 2016-04-07 12:28:22.241303 
Finish  : 2016-04-07 12:28:46.573440 (0.16 Hz, 24331.26 ms) 
IO Done (1) : 2016-04-07 12:28:46.573440 (24332.39 ms) 

Как вы можете видеть, для выполнения этой записи требуется 24 секунды (типичная запись составляет около 40 мс). Жесткий диск, который я пишу, не находится под нагрузкой, поэтому эта задержка не должна быть вызвана конфликтом (во время работы у него около 7% использования). Я отключил индексирование на моих HDFStore пишет. Мое приложение запускает множество других потоков, все из которых строят строки состояния печати, и поэтому кажется, что задача ввода-вывода блокирует все остальные потоки. Я потратил довольно много времени шагового через код, чтобы выяснить, где все замедляется, и это всегда в пределах метода, предоставленном расширением C, и это приводит к моему вопросу ..

  1. Can Python (Я использую 3.5) выполнить упреждение в расширении C? Concurrency: Are Python extensions written in C/C++ affected by the Global Interpreter Lock? Кажется, указывает, что это не происходит, если только расширение не дает.
  2. Имеет ли код Pandas HDF5 C какой-либо доход для ввода-вывода? Если да, значит ли это, что задержка связана с ограниченной задачей CPU? Я отключил индексирование.
  3. Любые предложения относительно того, как я могу получить несколько последовательные тайминги? Я подумываю переместить код HDF5 в другой процесс. Это только помогает в определенной степени, хотя, поскольку я не могу реально терпеть ~ 20 секунд, пишет все равно, особенно когда они непредсказуемы.

Вот пример, который вы можете запустить, чтобы увидеть проблему:

import pandas as pd 
import numpy as np 
from timeit import default_timer as timer 
import datetime 
import random 
import threading 
import time 

def write_samples(store, samples, overwrite): 
    frame = pd.DataFrame(samples, dtype='float64') 

    if not overwrite: 
     store.append("df", frame, format='table', index=False) 
    else: 
     store.put("df", frame, format='table', index=False) 

def begin_io(): 
    store = pd.HDFStore("D:\\slow\\test" + str(random.randint(0,100)) + ".h5", mode='w', complevel=0) 

    counter = 0 
    while True: 
     data = np.random.rand(50000, 1) 
     start_time = timer() 
     write_samples(store, data, counter == 0) 
     end_time = timer() 

     print("IO Done  : %s (%.2f ms, %d)" % (datetime.datetime.now(), (end_time - start_time) * 1000, counter)) 

     counter += 1 

    store.close() 

def dummy_thread(): 
    previous = timer() 
    while True: 
     now = timer() 
     print("Dummy Thread : %s (%d ms)" % (datetime.datetime.now(), (now - previous) * 1000)) 
     previous = now 
     time.sleep(0.01) 


if __name__ == '__main__': 
    threading.Thread(target=dummy_thread).start() 
    begin_io() 

вы получите результат, похожий на:

IO Done  : 2016-04-08 10:51:14.100479 (3.63 ms, 470) 
Dummy Thread : 2016-04-08 10:51:14.101484 (12 ms) 
IO Done  : 2016-04-08 10:51:14.104475 (3.01 ms, 471) 
Dummy Thread : 2016-04-08 10:51:14.576640 (475 ms) 
IO Done  : 2016-04-08 10:51:14.576640 (472.00 ms, 472) 
Dummy Thread : 2016-04-08 10:51:14.897756 (321 ms) 
IO Done  : 2016-04-08 10:51:14.898782 (320.79 ms, 473) 
IO Done  : 2016-04-08 10:51:14.901772 (3.29 ms, 474) 
IO Done  : 2016-04-08 10:51:14.905773 (2.84 ms, 475) 
IO Done  : 2016-04-08 10:51:14.908775 (2.96 ms, 476) 
Dummy Thread : 2016-04-08 10:51:14.909777 (11 ms) 
+0

Любой код? Как насчет того, чтобы сузить проблему до короткого простого скрипта, который мог бы написать ожидаемое количество некоторых фиктивных данных. Вы увидите, если он все еще страдает от одной и той же проблемы или работает хорошо. –

+0

@JanVlcinsky Я воспроизвел его в сценарии, который просто постоянно добавляется к HDFStore. Я упрощу это, а затем опубликую здесь. – user3870920

+0

@JanVlcinsky Добавлен код-пример – user3870920

ответ

2

Ответ: нет, эти авторы не отпускайте GIL , См. Документацию here. Я знаю, что вы на самом деле не пытаетесь написать с помощью несколько потоков, но это должно вас подсказать. Существуют сильные блокировки, которые хранятся, когда записи действительно помогают предотвратить множественную запись. Оба PyTables и h5py делают это как часть стандарта HDF5.

Вы можете посмотреть SWMR, хотя напрямую не поддерживается пандами. PyTables Документы here и here указывают на решения. В основном это связано с тем, что отдельный процесс вытягивает данные из очередей и записывает их.

Это, как правило, гораздо более масштабируемый шаблон в любом случае.

+0

Да, вытащить его в отдельный процесс - единственный способ, которым я могу думать, чтобы не блокировать другие потоки. Тем не менее, я все еще не совсем уверен, почему-то есть большая задержка время от времени. Я думаю, что это как-то увеличивает файл, но я не уверен, что конкретно делает. Я пытался установить большое количество для 'expectedrows' при создании, но это не помогает. Любые идеи о том, где искать? – user3870920

+0

'' PyTables'' chunk пишет (вычисляется относительно '' expectedrows''), но я думаю, что фактический размер записи (флеш) зависит от реализации (например, вы не знаете). Если вы чувствительны к времени, то делаете ставки, чтобы перетасовать другие процессы или, возможно, используйте что-то вроде '' msgpack'', которое просто сбрасывается прямо на диск (и является добавочным). Как правило, это захват в режиме реального времени. Вы позже отправляете материалы. – Jeff

+0

Я думал, что HDF5 будет хорошей ставкой, потому что это захват данных в реальном времени, но мы позволяем пользователю прокручивать назад свою историю, поэтому мы также должны иметь возможность читать куски (сеансы могут быть часами, поэтому 250 к/сек добавляет довольно быстро). Я думаю, если я смогу найти способ сделать его более кратковременным и переместить его в другой процесс, все должно быть в порядке. Я думаю, что немного разочаровывает, что GIL не выпущен для IO, но, вероятно, есть какая-то логика решение. – user3870920

1

Благодарим за предоставление рабочего кода. Я изменил это, чтобы получить некоторое представление, а затем создал модифицированную версию с использованием многопроцессорной обработки.

Модифицированный потоковая версия

Все модификации просто, чтобы получить больше информации из, нет концептуальных изменений. Все идет в один файл mthread.py и прокомментирован частично.

Импорт, как обычно:

import pandas as pd 
import numpy as np 
from timeit import default_timer as timer 
import datetime 
import random 
import threading 
import logging 

write_samples получил некоторую регистрацию:

def write_samples(store, samples, overwrite): 
    wslog = logging.getLogger("write_samples") 
    wslog.info("starting") 
    frame = pd.DataFrame(samples, dtype='float64') 

    if overwrite: 
     store.put("df", frame, format='table', index=False) 
    else: 
     store.append("df", frame, format='table', index=False) 
    wslog.info("finished") 

begin_io получил максимальную duaration, превысив, что результаты времени в записи журнала ПРЕДУПРЕЖДЕНИЕ:

def begin_io(maxduration=500): 
    iolog = logging.getLogger("begin_io") 
    iolog.info("starting") 
    try: 
     fname = "data/tab" + str(random.randint(0, 100)) + ".h5" 
     iolog.debug("opening store %s", fname) 
     with pd.HDFStore(fname, mode='w', complevel=0) as store: 
      iolog.debug("store %s open", fname) 

      counter = 0 
      while True: 
       data = np.random.rand(50000, 1) 
       start_time = timer() 
       write_samples(store, data, counter == 0) 
       end_time = timer() 
       duration = (end_time - start_time) * 1000 
       iolog.debug("IO Done  : %s (%.2f ms, %d)", 
          datetime.datetime.now(), 
          duration, 
          counter) 
       if duration > maxduration: 
        iolog.warning("Long duration %s", duration) 
       counter += 1 
    except Exception: 
     iolog.exception("oops") 
    finally: 
     iolog.info("finished") 

dummy_thread получили изменен, чтобы d также испускает ПРЕДУПРЕЖДЕНИЕ, если занимает слишком много времени:

def dummy_thread(pill2kill, maxduration=500): 
    dtlog = logging.getLogger("dummy_thread") 
    dtlog.info("starting") 
    try: 
     previous = timer() 
     while not pill2kill.wait(0.01): 
      now = timer() 
      duration = (now - previous) * 1000 
      dtlog.info("Dummy Thread : %s (%d ms)", 
         datetime.datetime.now(), 
         duration) 
      if duration > maxduration: 
       dtlog.warning("Long duration %s", duration) 
      previous = now 
     dtlog.debug("stopped looping.") 
    except Exception: 
     dtlog.exception("oops") 
    finally: 
     dtlog.info("finished") 

и, наконец, мы все называем. Не стесняйтесь изменять уровни журналов, WARNING показывает только чрезмерное время, INFO и DEBUG рассказывают гораздо больше.

if __name__ == '__main__': 
    logformat = '%(asctime)-15s [%(levelname)s] - %(name)s: %(message)s' 
    logging.basicConfig(format=logformat, 
         level=logging.WARNING) 

    pill2kill = threading.Event() 
    t = threading.Thread(target=dummy_thread, args=(pill2kill, 500)) 
    t.start() 
    try: 
     begin_io(500) 
    finally: 
     pill2kill.set() 
     t.join() 

Выполнение кода я получаю результаты, как вы описали:

2016-04-08 15:29:11,428 [WARNING] - begin_io: Long duration 5169.03591156 
2016-04-08 15:29:11,429 [WARNING] - dummy_thread: Long duration 5161.45706177 
2016-04-08 15:29:27,305 [WARNING] - begin_io: Long duration 1447.40581512 
2016-04-08 15:29:27,306 [WARNING] - dummy_thread: Long duration 1450.75201988 
2016-04-08 15:29:32,893 [WARNING] - begin_io: Long duration 1610.98194122 
2016-04-08 15:29:32,894 [WARNING] - dummy_thread: Long duration 1612.98394203 
2016-04-08 15:29:34,930 [WARNING] - begin_io: Long duration 823.182821274 
2016-04-08 15:29:34,930 [WARNING] - dummy_thread: Long duration 815.275907516 
2016-04-08 15:29:43,640 [WARNING] - begin_io: Long duration 510.369062424 
2016-04-08 15:29:43,640 [WARNING] - dummy_thread: Long duration 511.776924133 

Из значений понятно, что в то время как begin_io очень занят и delayd (вероятно, во время данных записываются на диск) , dummy_thread также задерживается почти на такое же количество времени.

Версия с многопроцессорной - работает хорошо

Я изменил код для запуска в различных процессах и с тех пор он действительно не блокирует dummy_thread.

2016-04-08 15:38:12,487 [WARNING] - begin_io: Long duration 755.397796631 
2016-04-08 15:38:14,127 [WARNING] - begin_io: Long duration 1434.60512161 
2016-04-08 15:38:15,725 [WARNING] - begin_io: Long duration 848.396062851 
2016-04-08 15:38:24,290 [WARNING] - begin_io: Long duration 1129.17089462 
2016-04-08 15:38:25,609 [WARNING] - begin_io: Long duration 1059.08918381 
2016-04-08 15:38:31,165 [WARNING] - begin_io: Long duration 646.969079971 
2016-04-08 15:38:37,273 [WARNING] - begin_io: Long duration 1699.17201996 
2016-04-08 15:38:43,788 [WARNING] - begin_io: Long duration 1555.341959 
2016-04-08 15:38:47,765 [WARNING] - begin_io: Long duration 639.196872711 
2016-04-08 15:38:54,269 [WARNING] - begin_io: Long duration 1690.57011604 
2016-04-08 15:39:06,397 [WARNING] - begin_io: Long duration 1998.33416939 
2016-04-08 15:39:16,980 [WARNING] - begin_io: Long duration 2558.51006508 
2016-04-08 15:39:21,688 [WARNING] - begin_io: Long duration 1132.73501396 
2016-04-08 15:39:26,450 [WARNING] - begin_io: Long duration 876.784801483 
2016-04-08 15:39:29,809 [WARNING] - begin_io: Long duration 709.135055542 
2016-04-08 15:39:31,748 [WARNING] - begin_io: Long duration 677.506923676 
2016-04-08 15:39:41,854 [WARNING] - begin_io: Long duration 770.184993744 

код с многопроцессорной здесь:

import pandas as pd 
import numpy as np 
from timeit import default_timer as timer 
import datetime 
import random 
import multiprocessing 
import time 
import logging 


def write_samples(store, samples, overwrite): 
    wslog = logging.getLogger("write_samples") 
    wslog.info("starting") 
    frame = pd.DataFrame(samples, dtype='float64') 

    if overwrite: 
     store.put("df", frame, format='table', index=False) 
    else: 
     store.append("df", frame, format='table', index=False) 
    wslog.info("finished") 


def begin_io(pill2kill, maxduration=500): 
    iolog = logging.getLogger("begin_io") 
    iolog.info("starting") 
    try: 
     fname = "data/tab" + str(random.randint(0, 100)) + ".h5" 
     iolog.debug("opening store %s", fname) 
     with pd.HDFStore(fname, mode='w', complevel=0) as store: 
      iolog.debug("store %s open", fname) 

      counter = 0 
      while not pill2kill.wait(0): 
       data = np.random.rand(50000, 1) 
       start_time = timer() 
       write_samples(store, data, counter == 0) 
       end_time = timer() 
       duration = (end_time - start_time) * 1000 
       iolog.debug("IO Done  : %s (%.2f ms, %d)", 
          datetime.datetime.now(), 
          duration, 
          counter) 
       if duration > maxduration: 
        iolog.warning("Long duration %s", duration) 
       counter += 1 
    except Exception: 
     iolog.exception("oops") 
    finally: 
     iolog.info("finished") 


def dummy_thread(pill2kill, maxduration=500): 
    dtlog = logging.getLogger("dummy_thread") 
    dtlog.info("starting") 
    try: 
     previous = timer() 
     while not pill2kill.wait(0.01): 
      now = timer() 
      duration = (now - previous) * 1000 
      dtlog.info("Dummy Thread : %s (%d ms)", 
         datetime.datetime.now(), 
         duration) 
      if duration > maxduration: 
       dtlog.warning("Long duration %s", duration) 
      previous = now 
     dtlog.debug("stopped looping.") 
    except Exception: 
     dtlog.exception("oops") 
    finally: 
     dtlog.info("finished") 


if __name__ == '__main__': 
    logformat = '%(asctime)-15s [%(levelname)s] - %(name)s: %(message)s' 
    logging.basicConfig(format=logformat, 
         level=logging.WARNING) 
    pill2kill = multiprocessing.Event() 
    dp = multiprocessing.Process(target=dummy_thread, args=(pill2kill, 500,)) 
    dp.start() 
    try: 
     p = multiprocessing.Process(target=begin_io, args=(pill2kill, 500,)) 
     p.start() 
     time.sleep(100) 
    finally: 
     pill2kill.set() 
     dp.join() 
     p.join() 

Выводы

записи данных в файл HDF5 действительно блокирует другие потоки и многопроцессорная версия требуется.

Если вы ожидаете, что dummy_thread сделать некоторую реальную работу (например, сбор данных для хранения), и вы хотите отправить данные здесь сериализатором HDF5, вы должны какой-то обмен сообщениями - либо с помощью multiprocessing.Queue, Pipe или, возможно, использовать ZeroMQ (например, PUSH - PULL гнездо ). С ZeroMQ вы можете выполнять сохранение данных даже на другом компьютере.

РЕДАКТИРОВАТЬ/ПРЕДУПРЕЖДЕНИЕ: Предоставленный код может сэкономить данные некогда, я сделал это для измерения производительности и не сделал его водонепроницаемым. Когда Ctrl-C во время обработки, иногда я получаю поврежденный файл. Эта проблема я рассматриваю вне сферы этого вопроса (и проблема должна быть решена путем тщательной остановки текущего процесса).

+0

Спасибо за добавление более полезного ведения журнала, вам нужно было искать выход с помощью моего кода :) Думаю, мы пришли к одному и тому же выводу - нужно использовать многопроцессорность. – user3870920

+0

@ user3870920 Ваш вопрос помог мне лучше понять GIL. Спасибо за это. Обратите внимание, что мой журнал печатает только раз более 500 мс, поэтому короткие (обычно около 10 мс) не отображаются. –

 Смежные вопросы

  • Нет связанных вопросов^_^