2014-10-28 2 views
0

PyMongo supports генераторы для пакетной обработки с sDB.insert(iter_something(converted)). Функции записи с массовой загрузкой, которые выполняют операции записи в пакетах, чтобы уменьшить количество сетевых обходов и увеличить пропускную способность записи.Функции обработки писем PyMongo с многопроцессорными и генераторами

Следующий код, похоже, работает, но я не могу ли PyMongo по-прежнему выполнять итерацию генератора вместе с многопроцессорной обработкой, пока не получит 1000 документов или 16 МБ данных, а затем приостановит генератор, пока он вставляет пакет в MongoDB.

#!/usr/bin/env python 
from __future__ import absolute_import, division, print_function 
from itertools import groupby 
from pymongo import MongoClient 
from multiprocessing import Process, JoinableQueue 
import csv 

# > use test 
# switched to db test 
# > db.createCollection("abc") 
# { "ok" : 1 } 
# > db.abc.find() 


parts = [["Test", "A", "B01", 828288, 1, 7, 'C', 5], 
    ["Test", "A", "B01", 828288, 1, 7, 'T', 6], 
    ["Test", "A", "B01", 171878, 3, 7, 'C', 5], 
    ["Test", "A", "B01", 171878, 3, 7, 'T', 6], 
    ["Test", "A", "B01", 871963, 3, 9, 'A', 5], 
    ["Test", "A", "B01", 871963, 3, 9, 'G', 6], 
    ["Test", "A", "B01", 1932523, 1, 10, 'T', 4], 
    ["Test", "A", "B01", 1932523, 1, 10, 'A', 5], 
    ["Test", "A", "B01", 1932523, 1, 10, 'X', 6], 
    ["Test", "A", "B01", 667214, 1, 14, 'T', 4], 
    ["Test", "A", "B01", 667214, 1, 14, 'G', 5], 
    ["Test", "A", "B01", 667214, 1, 14, 'G', 6]] 


def iter_something(rows): 
    key_names = ['type', 'name', 'sub_name', 'pos', 's_type', 'x_type'] 
    chr_key_names = ['letter', 'no'] 
    for keys, group in groupby(rows, lambda row: row[:6]): 
     result = dict(zip(key_names, keys)) 
     result['chr'] = [dict(zip(chr_key_names, row[6:])) for row in group] 
     yield result 

class Loading(Process): 

    def __init__(self, task_queue): 
     Process.__init__(self) 
     self.task_queue = task_queue 
     db = MongoClient().test 
     self.sDB = db["abc"] 

    def run(self): 
     while True: 
      doc = self.task_queue.get() 
      if doc is None: # None means shutdown 
       self.task_queue.task_done() 
       break 
      else: 
       self.sDB.insert(doc) 

def main(): 
    num_cores = 2 

    tasks = JoinableQueue() 

    threads = [Loading(tasks) for i in range(num_cores)] 

    for i, w in enumerate(threads): 
     w.start() 
     print('Thread ' + str(i+1) + ' has started!') 

    converters = [str, str, str, int, int, int, str, int] 
    with open("/home/mic/tmp/test.txt") as f: 
     reader = csv.reader(f, skipinitialspace=True) 
     converted = ([conv(col) for conv, col in zip(converters, row)] for row in reader) 
     # sDB.insert(iter_something(converted)) 

     # Enqueue jobs 
     for i in iter_something(converted): 
      tasks.put(i) 

    # Add None to kill each thread 
    for i in range(num_cores): 
     tasks.put(None) 

    # Wait for all of the tasks to finish 
    tasks.join() 


if __name__ == '__main__': 
    main() 
+0

Будет ли 'db = MongoClient(). Test' и' self.sDB = db ["abc"] 'в каждом потоке каждый раз перезаписывать базу данных? – user977828

ответ

0

В этом случае вы не воспользоваться пакетными вставками. Каждый вызов «self.sDB.insert (doc)» немедленно отправляет документ в MongoDB и ждет ответа с сервера. Вы можете попробовать это:

def run(self): 
    def gen(): 
     while True: 
      doc = self.task_queue.get() 
      if doc is None: # None means shutdown 
       self.task_queue.task_done() 
       break 

      else: 
       yield doc 

    try: 
     self.sDB.insert(gen()) 
    except InvalidOperation as e: 
     # Perhaps "Empty bulk write", this process received no documents. 
     print(e) 

Используйте mongosniff чтобы убедиться, что вы отправляете большие партии на сервер вместо вставки одного документа одновременно. В зависимости от количества документов и количества процессов некоторые процессы не могут получить никаких документов. PyMongo бросает InvalidOperation, если вы пытаетесь вставить из пустого итератора, поэтому я «вставляю» с помощью «try/except».

Кстати, вам не нужно вызывать createCollection с MongoDB: первая вставка в коллекцию создает его автоматически. createCollection необходим только в том случае, если вам нужны специальные опции, например, ограниченная коллекция.

+0

Спасибо, но теперь я получил 'InvalidOperation: не могу сделать пустой объемный напиток'. Как-то поставить None на очередь задач не работает. – user977828

+0

Я отредактирую свой ответ, чтобы обработать исключение InvalidOperation. –