RuntimeError: в потоке не существует цикла текущего события в async + apscheduler
У меня есть функция async и вам нужно запускать apscheduller каждые N минут.
Ниже приведен код python
URL_LIST = ['<url1>',
'<url2>',
'<url2>',
]
def demo_async(urls):
"""Fetch list of web pages asynchronously."""
loop = asyncio.get_event_loop() # event loop
future = asyncio.ensure_future(fetch_all(urls)) # tasks to do
loop.run_until_complete(future) # loop until done
async def fetch_all(urls):
tasks = [] # dictionary of start times for each url
async with ClientSession() as session:
for url in urls:
task = asyncio.ensure_future(fetch(url, session))
tasks.append(task) # create list of tasks
_ = await asyncio.gather(*tasks) # gather task responses
async def fetch(url, session):
"""Fetch a url, using specified ClientSession."""
async with session.get(url) as response:
resp = await response.read()
print(resp)
if __name__ == '__main__':
scheduler = AsyncIOScheduler()
scheduler.add_job(demo_async, args=[URL_LIST], trigger='interval', seconds=15)
scheduler.start()
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
# Execution will block here until Ctrl+C (Ctrl+Break on Windows) is pressed.
try:
asyncio.get_event_loop().run_forever()
except (KeyboardInterrupt, SystemExit):
pass
Но когда я попытался запустить его, у меня появилась следующая информация об ошибке
Job "demo_async (trigger: interval[0:00:15], next run at: 2017-10-12 18:21:12 +04)" raised an exception.....
..........\lib\asyncio\events.py", line 584, in get_event_loop
% threading.current_thread().name)
RuntimeError: There is no current event loop in thread '<concurrent.futures.thread.ThreadPoolExecutor object at 0x0356B150>_0'.
Не могли бы вы помочь мне с этим?
Python 3.6, APScheduler 3.3.1,
Ответы
Ответ 1
Просто передайте fetch_all
в scheduler.add_job()
напрямую. Планировщик asyncio поддерживает функции сопрограммы в качестве целевых заданий.
Если целевой вызов не является сопроводительной функцией, он будет запущен в рабочем потоке (из-за исторических причин), следовательно, исключение.
Ответ 2
В вашем def demo_async(urls)
попробуйте заменить:
loop = asyncio.get_event_loop()
с:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
Ответ 3
Важная вещь, которая не была упомянута, - то, почему ошибка происходит. Лично для меня знание, почему возникает ошибка, так же важно, как и решение самой проблемы.
Давайте посмотрим на реализацию get_event_loop
BaseDefaultEventLoopPolicy
:
class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
...
def get_event_loop(self):
"""Get the event loop.
This may be None or an instance of EventLoop.
"""
if (self._local._loop is None and
not self._local._set_called and
isinstance(threading.current_thread(), threading._MainThread)):
self.set_event_loop(self.new_event_loop())
if self._local._loop is None:
raise RuntimeError('There is no current event loop in thread %r.'
% threading.current_thread().name)
return self._local._loop
Вы можете видеть, что self.set_event_loop(self.new_event_loop())
выполняется только при соблюдении всех перечисленных ниже условий:
-
self._local._loop is None
- _local._loop
не установлен -
not self._local._set_called
- set_event_loop
еще не был вызван -
isinstance(threading.current_thread(), threading._MainThread)
- текущий поток является основным (это не так в вашем случае)
Поэтому возникает исключение, потому что в текущем потоке не установлен цикл:
if self._local._loop is None:
raise RuntimeError('There is no current event loop in thread %r.'
% threading.current_thread().name)
Ответ 4
Используйте asyncio.run()
вместо прямого использования цикла событий. Создает новый цикл и закрывает его по окончании.
Вот как выглядит "запуск":
if events._get_running_loop() is not None:
raise RuntimeError(
"asyncio.run() cannot be called from a running event loop")
if not coroutines.iscoroutine(main):
raise ValueError("a coroutine was expected, got {!r}".format(main))
loop = events.new_event_loop()
try:
events.set_event_loop(loop)
loop.set_debug(debug)
return loop.run_until_complete(main)
finally:
try:
_cancel_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
events.set_event_loop(None)
loop.close()
Ответ 5
Поскольку этот вопрос продолжает появляться на первой странице, я напишу свою проблему и мой ответ здесь.
У меня был RuntimeError: There is no current event loop in thread 'Thread-X'.
при использовании колбы-сокета и Bleak.
Редактировать: хорошо, я реорганизовал свой файл и сделал класс.
Я инициализировал цикл в конструкторе, и теперь все работает нормально:
class BLE:
def __init__(self):
self.loop = asyncio.get_event_loop()
# function example, improvement of
# https://github.com/hbldh/bleak/blob/master/examples/discover.py :
def list_bluetooth_low_energy(self) -> list:
async def run() -> list:
BLElist = []
devices = await bleak.discover()
for d in devices:
BLElist.append(d.name)
return 'success', BLElist
return self.loop.run_until_complete(run())
Использование:
ble = path.to.lib.BLE()
list = ble.list_bluetooth_low_energy()
Оригинальный ответ:
Решение было глупым. Я не обратил внимания на то, что я сделал, но я переместил некоторый import
из функции, например так:
import asyncio, platform
from bleak import discover
def listBLE() -> dict:
async def run() -> dict:
# my code that keep throwing exceptions.
loop = asyncio.get_event_loop()
ble_list = loop.run_until_complete(run())
return ble_list
Поэтому я подумал, что мне нужно что-то изменить в своем коде, и я создал новый цикл обработки событий, используя этот фрагмент кода непосредственно перед строкой с get_event_loop()
:
loop = asyncio.new_event_loop()
loop = asyncio.set_event_loop()
В этот момент я был очень счастлив, так как у меня был цикл.
Но не отвечает. И мой код полагался на тайм-аут, чтобы вернуть некоторые значения, так что это было довольно плохо для моего приложения.
Мне потребовалось почти два часа, чтобы понять, что проблема заключалась в import
, и вот мой (рабочий) код:
def list() -> dict:
import asyncio, platform
from bleak import discover
async def run() -> dict:
# my code running perfectly
loop = asyncio.get_event_loop()
ble_list = loop.run_until_complete(run())
return ble_list