17

У меня есть многопоточное приложение python. Я хочу запустить цикл asyncio в потоке, а затем отправить кавычки и сопрограммы к нему из другого потока. Должно быть легко, но я не могу окунуться в вещи asyncio.python asyncio, как создавать и отменить задачи из другого потока

Я пришел к следующему решению, которое делает половину того, что я хочу, не стесняйтесь комментировать что-либо:

import asyncio 
from threading import Thread 

class B(Thread): 
    def __init__(self): 
     Thread.__init__(self) 
     self.loop = None 

    def run(self): 
     self.loop = asyncio.new_event_loop() 
     asyncio.set_event_loop(self.loop) #why do I need that?? 
     self.loop.run_forever() 

    def stop(self): 
     self.loop.call_soon_threadsafe(self.loop.stop) 

    def add_task(self, coro): 
     """this method should return a task object, that I 
      can cancel, not a handle""" 
     f = functools.partial(self.loop.create_task, coro) 
     return self.loop.call_soon_threadsafe(f) 

    def cancel_task(self, xx): 
     #no idea 

@asyncio.coroutine 
def test(): 
    while True: 
     print("running") 
     yield from asyncio.sleep(1) 

b.start() 
time.sleep(1) #need to wait for loop to start 
t = b.add_task(test()) 
time.sleep(10) 
#here the program runs fine but how can I cancel the task? 

b.stop() 

Так, начиная и останавливая цикл работает нормально. Я думал о создании задачи с помощью create_task, но этот метод не является потоковым, поэтому я завернул его в call_soon_threadsafe. Но я хотел бы иметь возможность получить объект задачи, чтобы иметь возможность отменить задачу. Я мог бы делать сложный материал, используя Future and Condition, но должен быть более простой способ, разве это не так?

ответ

13

Я думаю, вам может понадобиться сделать ваш метод add_task осведомленным о том, вызывается ли его из потока, отличного от цикла события. Таким образом, если он вызывается из одного потока, вы можете просто позвонить asyncio.async напрямую, в противном случае он может выполнить дополнительную работу, чтобы передать задачу из потока цикла в вызывающий поток. Вот пример:

import time 
import asyncio 
import functools 
from threading import Thread, current_thread, Event 
from concurrent.futures import Future 

class B(Thread): 
    def __init__(self, start_event): 
     Thread.__init__(self) 
     self.loop = None 
     self.tid = None 
     self.event = start_event 

    def run(self): 
     self.loop = asyncio.new_event_loop() 
     asyncio.set_event_loop(self.loop) 
     self.tid = current_thread() 
     self.loop.call_soon(self.event.set) 
     self.loop.run_forever() 

    def stop(self): 
     self.loop.call_soon_threadsafe(self.loop.stop) 

    def add_task(self, coro): 
     """this method should return a task object, that I 
      can cancel, not a handle""" 
     def _async_add(func, fut): 
      try: 
       ret = func() 
       fut.set_result(ret) 
      except Exception as e: 
       fut.set_exception(e) 

     f = functools.partial(asyncio.async, coro, loop=self.loop) 
     if current_thread() == self.tid: 
      return f() # We can call directly if we're not going between threads. 
     else: 
      # We're in a non-event loop thread so we use a Future 
      # to get the task from the event loop thread once 
      # it's ready. 
      fut = Future() 
      self.loop.call_soon_threadsafe(_async_add, f, fut) 
      return fut.result() 

    def cancel_task(self, task): 
     self.loop.call_soon_threadsafe(task.cancel) 


@asyncio.coroutine 
def test(): 
    while True: 
     print("running") 
     yield from asyncio.sleep(1) 

event = Event() 
b = B(event) 
b.start() 
event.wait() # Let the loop's thread signal us, rather than sleeping 
t = b.add_task(test()) # This is a real task 
time.sleep(10) 
b.stop() 

Во-первых, мы сохраняем идентификатор потока из цикла событий в методе run, так что мы можем выяснить, если звонки на add_task приходят из других потоков позже. Если add_task вызывается из потока цикла, отличного от события, мы используем call_soon_threadsafe для вызова функции, которая будет как планировать сопрограмму, так и затем использовать concurrent.futures.Future, чтобы передать задачу обратно вызывающему потоку, который ждет результата Future.

Замечание об отмене задания: Вы когда вы звоните cancel на Task, CancelledError будет поднят в сопрограммы следующий раз, когда запускается цикл обработки событий. Это означает, что сопрограмма, которая завершает задачу Task, будет прервана из-за исключения в следующий раз, когда она ударит по пределу текучести - если только coroutine не поймает CancelledError и не позволит себе прервать. Также обратите внимание, что это работает только в том случае, если выполняемая функция фактически является прерывистой сопрограммой; Например, asyncio.Future, возвращаемый BaseEventLoop.run_in_executor, действительно не может быть отменен, поскольку он фактически обернут вокруг concurrent.futures.Future, и они не могут быть отменены, как только их базовая функция начнет выполняться. В этих случаях asyncio.Future скажет, что его отменили, но функция, фактически запущенная в исполнителе, будет продолжать работать.

Edit: Обновил первый пример использования concurrent.futures.Future, вместо queue.Queue, по предложению Андрея Светлова.

Примечание: asyncio.async устарел с версии 3.4.4 вместо asyncio.ensure_future.

+0

Спасибо, например, это помогло мне исправить несколько вопросов, которые у меня были. Кстати, я также должен был инициировать Future with Future (loop = self.loop), иначе в некоторых случаях будущее будет принимать неправильный цикл –

+0

@OlivierRD Вы должны использовать 'concurrent.futures.Future', а не' asyncio.Future'. 'concurrent.futures.Future' не принимает ключевое слово' loop'. – dano

+0

Документация, похоже, говорит, что она: https://docs.python.org/3/library/asyncio-task.html#asyncio.Future –

6

Вы все делаете правильно. Для задачи остановки метода делают

class B(Thread): 
    # ... 
    def cancel(self, task): 
     self.loop.call_soon_threadsafe(task.cancel) 

BTW вы имеют чтобы настроить цикл событий для созданного потока явно

self.loop = asyncio.new_event_loop() 
asyncio.set_event_loop(self.loop) 

потому что asyncio создает неявный цикл событий только для основного потока.

+0

Недостающий кусок здесь, как получить дескриптор 'task' в первую очередь. Поскольку OP должен использовать 'call_soon_threadsafe (self.loop.create_task)' в методе 'add_task', у него фактически нет дескриптора задачи после добавления его в цикл. – dano

+1

Получил это. Ты прав. @dano BTW вы можете использовать concurrent.futures.Future вместо очереди в своем ответе. Я думаю, что он чище. –

+0

Да, я согласен, что использование 'Future' лучше, чем' Queue'. Я обновил свой ответ, чтобы отразить это. Благодаря! – dano

5

только для справки здесь это код, который я, наконец, реализовал на основе помощи, которую я получил на этом сайте, проще, поскольку мне не нужны все функции. еще раз спасибо!

class B(Thread): 
    def __init__(self): 
     Thread.__init__(self) 
     self.loop = None 

    def run(self): 
     self.loop = asyncio.new_event_loop() 
     asyncio.set_event_loop(self.loop) 
     self.loop.run_forever() 

    def stop(self): 
     self.loop.call_soon_threadsafe(self.loop.stop) 

    def _add_task(self, future, coro): 
     task = self.loop.create_task(coro) 
     future.set_result(task) 

    def add_task(self, coro): 
     future = Future() 
     p = functools.partial(self._add_task, future, coro) 
     self.loop.call_soon_threadsafe(p) 
     return future.result() #block until result is available 

    def cancel(self, task): 
     self.loop.call_soon_threadsafe(task.cancel) 
2

Начиная с версии 3.4.4 asyncio предоставляет функцию run_coroutine_threadsafe представить сопрограмму объект из потока на цикл обработки событий. Он возвращает concurrent.futures.Future для получения результата или отмены задания.

Используя ваш пример:

@asyncio.coroutine 
def test(loop): 
    try: 
     while True: 
      print("Running") 
      yield from asyncio.sleep(1, loop=loop) 
    except asyncio.CancelledError: 
     print("Cancelled") 
     loop.stop() 
     raise 

loop = asyncio.new_event_loop() 
thread = threading.Thread(target=loop.run_forever) 
future = asyncio.run_coroutine_threadsafe(test(loop), loop) 

thread.start() 
time.sleep(5) 
future.cancel() 
thread.join() 
+0

Чтобы предотвратить состояние гонки или тупик, не вызывайте 'future.cancel()' непосредственно. Вместо этого используйте 'loop.call_soon_threadsafe (future.cancel)'. См. [Здесь] (https://docs.python.org/3.4/library/asyncio-dev.html#concurrency-and-multithreading). – changyuheng

+1

@ ChangYu-heng Это верно для фреймов [asyncio.Future] (https://docs.python.org/3.4/library/asyncio-task.html#asyncio.Future), но [run_coroutine_threadsafe] (https: // docs.python.org/3.4/library/asyncio-task.html#asyncio.run_coroutine_threadsafe) возвращает [concurrent.futures.Future] (https://docs.python.org/3.4/library/concurrent.futures.html# concurrent.futures.Future), который является потокобезопасным и не зависит от какого-либо цикла событий. – Vincent

+0

@Vicent Извините, я не внимательно изучил оригинальный вопрос. Таким образом, дополнительным комментарием для этого будет: use 'loop.call_soon_threadsafe (future.cancel)', если вы собираетесь выполнить 'future.cancel()' из потока, который не является циклом события. – changyuheng