2016-12-26 5 views
2

Я новичок в асинхронном программировании и запутался с ним.Время ответа на запрос Tornado слишком велико даже с gen.coroutine

Я запутался, когда я украсил RequestHandler gen.coroutine, но обнаружил, что запрос по-прежнему заблокирован.

Вот краткий код, с питоном 2.7.11 и торнадо 4.4.1

@gen.coroutine 
def store_data(data): 
    try: 
     # parse_data 
     ... 
    except ParseError as e: 
     logger.warning(e) 
     return 
    yield motor.insert_many(parsed_data) # asynchronous mongo 
    print motor.count() 

class MainHandler(RequestHandler): 
    @gen.coroutine 
    def post(self): 
     try: 
      some_argument = int(self.get_argument("some", 0)) 
      data = self.request.body 
     except Exception: 
      self.write("Improper Argument") 
      self.finish() 
      return 
     IOLoop.current().spawn_callback(lambda: store_data(data)) 
     self.write("Request Done") 
     self.finish() 

И я сделал тест с 10 нитями. По времени отклика в журнале доступ, я полагаю, некоторые запросы были заблокированы

[I 161222 15:40:22 web:1971] 200 POST /upload/ (::1) 9.00ms 
[I 161222 15:40:23 web:1971] 200 POST /upload/ (::1) 8.00ms 
[I 161222 15:40:23 web:1971] 200 POST /upload/ (::1) 8.00ms 
[I 161222 15:40:23 web:1971] 200 POST /upload/ (::1) 7.00ms 
[I 161222 15:40:23 web:1971] 200 POST /upload/ (::1) 8.00ms 
[I 161222 15:40:23 web:1971] 200 POST /upload/ (::1) 9.00ms 
[I 161222 15:40:23 web:1971] 200 POST /upload/ (::1) 8.00ms 
[I 161222 15:40:23 web:1971] 200 POST /upload/ (::1) 9.00ms 
[I 161222 15:40:23 web:1971] 200 POST /upload/ (::1) 701.00ms # Seem blocked 
[I 161222 15:40:23 web:1971] 200 POST /upload/ (::1) 696.00ms # Seem blocked 

Update

отслеживающего послание set_blocking_log_threshold(0.5)

File "********", line 74, in <dictcomp> 
    data = [dict({"sid": sid}, **{key: value for key, value in i.iteritems() 

Вся линейка этого кода

data = [dict({"sid": sid}, **{key: value for key, value in i.iteritems() if key in need_cols}) for i in v_data] 

И распакованы логика что-то вроде этого

data = [] 
# `v_data` is a huge dict which could be considered as a mongo collection, and `i` as a mongo document 
for i in v_data: 
    temp = {key: value for key, value in i.iteritems() if key in need_cols} # discard some keys 
    temp["sid"] = sid # add same `sid` to all items 
    data.append(temp) 

я изменил его к генератору

def data_generator(v_data, need_cols, sid): 
    for i in v_data: 
     temp = {key: value for key, value in i.iteritems() if key in need_cols} # discard some keys 
     temp["sid"] = sid # add same `sid` to all items 
     yield temp 

@gen.coroutine 
def store_data(data): 
    try: 
     # parse_data 
     ... 
    except ParseError as e: 
     logger.warning(e) 
     return 
    ge = data_generator(v_data, need_cols, sid) 
    yield motor.insert_many(ge) # asynchronous mongo 
    print motor.count() 

Нет Пороговые журналы предупреждения сообщалось больше, но время отклика, казалось, все еще заблокирован

[I 170109 17:26:32 web:1971] 200 POST /upload/ (::1) 3.00ms 
[I 170109 17:26:33 web:1971] 200 POST /upload/ (::1) 2.00ms 
[I 170109 17:26:33 web:1971] 200 POST /upload/ (::1) 4.00ms 
[I 170109 17:26:33 web:1971] 200 POST /upload/ (::1) 3.00ms 
[I 170109 17:26:33 web:1971] 200 POST /upload/ (::1) 3.00ms 
[I 170109 17:26:33 web:1971] 200 POST /upload/ (::1) 2.00ms 
[I 170109 17:26:33 web:1971] 200 POST /upload/ (::1) 354.00ms 
[I 170109 17:26:33 web:1971] 200 POST /upload/ (::1) 443.00ms 

Затем я установил порог в 0.2 с. Получил это сообщение

File "*******", line 76, in store_data 
    increment = json.load(fr) 
    File "/usr/local/python2.7/lib/python2.7/json/__init__.py", line 291, in load 
    **kw) 
    File "/usr/local/python2.7/lib/python2.7/json/__init__.py", line 339, in loads 
    return _default_decoder.decode(s) 
    File "/usr/local/python2.7/lib/python2.7/json/decoder.py", line 364, in decode 
    obj, end = self.raw_decode(s, idx=_w(s, 0).end()) 
    File "/usr/local/python2.7/lib/python2.7/json/decoder.py", line 380, in raw_decode 
    obj, end = self.scan_once(s, idx) 

Теперь я не знаю, как сделать это заявление асинхронного

ответ

0

Я думаю, что проблема может быть о том, как вы вызываете вашу функцию store_data сопрограмму. Вы должны правильно называть сопрограммы. Как это уже here:

почти во всех случаях, любую функцию, которая вызывает сопрограмма должна быть сам по себе сопрограммным и использовать yield ключевое слово в вызове.

Так store_data должен называться как этот yield store_data() не store_data(). Здесь

IOLoop.current().spawn_callback(lambda: store_data(data)) 

Я предполагаю, что вы используете lambda, потому что вы хотите, чтобы дать data вашей функции в качестве аргумента, но вы можете сделать это с самого spawn_callback. Вы можете попробовать:

IOLoop.current().spawn_callback(store_data, data) 

Надеюсь, это поможет.

0

Отделка функции @gen.coroutine не приносит пользы, если эта функция никогда не yields.

Ваш метод post() выглядит правильно: он никогда не блокирует или ничего не мешает IOLoop. Тем не менее, IOLoop является общим ресурсом, и все, что блокирует его, может вызвать время, которое вы видите. Я подозреваю, что что-то, что вы не показываете нам в store_data (или в другом месте программы), блокируется. Чтобы выделить, где эта блокировка может быть, вызовите IOLoop.current().set_blocking_log_threshold(0.5) в начале вашей программы, и она будет регистрировать трассировку стека, когда IOLoop заблокирован на полсекунды.

+0

Спасибо, Бен! Я обновил описание вопроса. Не могли бы вы оказать мне дальнейшую помощь? – Morry

+0

Если у вас столько данных, что просто синтаксический анализ json блокируется слишком долго, вам просто придется переместить эту работу в ThreadPoolExecutor. 'increment = yield executor.submit (json.load, fr)'. После того как вы разобрали json, вы можете разделить его на более мелкие партии для дальнейшей обработки. Существуют также сторонние библиотеки json, которые могут быть быстрее и/или поддерживать потоковый интерфейс. –