Ответ 1
Я думаю, вам может понадобиться сделать ваш метод add_task
осведомленным о том, вызывается ли его из потока, отличного от цикла события. Таким образом, если он вызывается из одного потока, вы можете просто вызвать asyncio.async
напрямую, в противном случае он может выполнить некоторую дополнительную работу, чтобы передать задачу из потока цикла в вызывающий поток. Вот пример:
import time
import asyncio
import functools
from threading import Thread, current_thread, Event
from concurrent.futures import Future
class B(Thread):
def __init__(self, start_event):
Thread.__init__(self)
self.loop = None
self.tid = None
self.event = start_event
def run(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.tid = current_thread()
self.loop.call_soon(self.event.set)
self.loop.run_forever()
def stop(self):
self.loop.call_soon_threadsafe(self.loop.stop)
def add_task(self, coro):
"""this method should return a task object, that I
can cancel, not a handle"""
def _async_add(func, fut):
try:
ret = func()
fut.set_result(ret)
except Exception as e:
fut.set_exception(e)
f = functools.partial(asyncio.async, coro, loop=self.loop)
if current_thread() == self.tid:
return f() # We can call directly if we're not going between threads.
else:
# We're in a non-event loop thread so we use a Future
# to get the task from the event loop thread once
# it ready.
fut = Future()
self.loop.call_soon_threadsafe(_async_add, f, fut)
return fut.result()
def cancel_task(self, task):
self.loop.call_soon_threadsafe(task.cancel)
@asyncio.coroutine
def test():
while True:
print("running")
yield from asyncio.sleep(1)
event = Event()
b = B(event)
b.start()
event.wait() # Let the loop thread signal us, rather than sleeping
t = b.add_task(test()) # This is a real task
time.sleep(10)
b.stop()
Сначала мы сохраняем идентификатор потока цикла событий в методе run
, поэтому мы можем выяснить, поступают ли вызовы на add_task
из других потоков позже. Если add_task
вызывается из потока цикла без события, мы используем call_soon_threadsafe
для вызова функции, которая будет как планировать сопрограмму, так и затем использовать concurrent.futures.Future
, чтобы передать задачу обратно вызывающему потоку, который ждет результат Future
.
Заметка об отмене задачи: вы, когда вы вызываете cancel
в Task
, a CancelledError
будут подняты в сопрограмме при следующем запуске цикла события. Это означает, что сопрограмма, которая завершает задачу Task, будет прервана из-за исключения в следующий раз, когда она нажмет предел текучести - если сопрограмма не улавливает CancelledError
и предотвращает ее прерывание. Также обратите внимание, что это работает только в том случае, если выполняемая функция фактически является прерывистой сопрограммой; a asyncio.Future
, возвращаемый BaseEventLoop.run_in_executor
, например, не может быть действительно отменен, потому что он фактически обернут вокруг concurrent.futures.Future
, и они не могут быть отменены, как только их базовая функция начинает выполняться. В этом случае asyncio.Future
скажет, что его отменено, но функция, выполняемая в исполнителе, будет продолжать работать.
Изменить: Обновлен первый пример использования concurrent.futures.Future
вместо queue.Queue
, по предложению Андрея Светлова.
Примечание: asyncio.async
устарел с версии 3.4.4, используя asyncio.ensure_future
.