Это действительно интересный вопрос, так как данные достаточно велики, чтобы легко вписаться в память.
Прежде всего, о I/O: если это CSV я хотел бы использовать стандартную библиотеку csv.reader()
объект, например, так (я предполагаю, что Python 3):
with open('big.csv', newline='') as f:
for row in csv.reader(f):
...
Тогда я, наверное, сохраняйте скользящее окно строк в экземпляре collections.deque(maxlen=WINDOW_SIZE)
с размером окна, установленным, возможно, 20 на основе вашего описания. Прочитайте первые строки WINDOW_SIZE
в deque, затем введите основной цикл чтения, который выведет самый левый элемент (строка) в deque, а затем добавьте текущую строку.
После добавления каждой строки, если временная метка текущей строки предшествует отметке времени предыдущих строк (window[-2]
), затем отберите deque. Вы не можете сортировать Deque непосредственно, но сделать что-то вроде:
window = collections.deque(sorted(window), maxlen=WINDOW_SIZE)
алгоритм Timsort Пайтона обрабатывает уже отсортированных работает эффективно, так что это должно быть очень быстрым (линейное время).
Если размер окна и количество строк не по порядку невелики (как кажется, они могут быть), я считаю, что общий алгоритм будет O (N), где N - количество строк в файл данных, поэтому линейное время.
ОБНОВЛЕНИЕ: Я написал демо-код для создания такого файла, а затем отсортировал его с использованием вышеуказанной методики - см. this Gist, протестированный на Python 3.5. Это намного быстрее, чем утилита sort
по тем же данным, а также быстрее, чем функция sorted()
Python после N = 1,000,000. Кстати, функция, которая генерирует демонстрационный CSV, значительно медленнее, чем код сортировки. :-) Мои результаты синхронизации process_sliding()
для различных N (определенно выглядит линейно-выход):
- N = 1000000: 3.5с
- N = 2000000: 6.6s
- N = 10000000: 32.9s
для справки, вот код для моей версии process_sliding()
:
def process_sliding(in_filename, out_filename, window_size=20):
with (open(in_filename, newline='') as fin,
open(out_filename, 'w', newline='') as fout):
reader = csv.reader(fin)
writer = csv.writer(fout)
first_window = sorted(next(reader) for _ in range(window_size))
window = collections.deque(first_window, maxlen=window_size)
for row in reader:
writer.writerow(window.popleft())
window.append(row)
if row[0] < window[-2][0]:
window = collections.deque(sorted(window), maxlen=window_size)
for row in window:
writer.writerow(row)
Я хотел бы использовать iteetools.islice, чтобы получить Chuc k данных – Copperfield
@Copperfield Вы можете уточнить? Если вы имеете в виду, когда вы получаете этот кусок 'first_window', я думал об этом, но я думаю, что часто проще и проще избегать импорта и использовать более простые встроенные функции. –
Да, я имею в виду «first_window». Также ваш текущий способ может вызвать исключение StopIteration, возможно, для этого варианта использования это не проблема, но зачем оставлять там легко устранимую проблему? – Copperfield