Эквивалент asyncio.Queues с рабочими "потоками"
Я пытаюсь выяснить, как переносить поточную программу на использование asyncio
. У меня есть много кода, который синхронизируется вокруг нескольких стандартных библиотек Queues
, в основном так:
import queue, random, threading, time
q = queue.Queue()
def produce():
while True:
time.sleep(0.5 + random.random()) # sleep for .5 - 1.5 seconds
q.put(random.random())
def consume():
while True:
value = q.get(block=True)
print("Consumed", value)
threading.Thread(target=produce).start()
threading.Thread(target=consume).start()
Один поток создает значения (возможно, пользовательский ввод), а другой поток что-то делает с ними. Дело в том, что эти потоки простаивают до тех пор, пока не появятся новые данные, после чего они просыпаются и что-то делают с ним.
Я пытаюсь реализовать этот шаблон с помощью asyncio, но я не могу понять, как заставить его "идти".
Мои попытки выглядят более или менее подобными (и вообще ничего не делают).
import asyncio, random
q = asyncio.Queue()
@asyncio.coroutine
def produce():
while True:
q.put(random.random())
yield from asyncio.sleep(0.5 + random.random())
@asyncio.coroutine
def consume():
while True:
value = yield from q.get()
print("Consumed", value)
# do something here to start the coroutines. asyncio.Task()?
loop = asyncio.get_event_loop()
loop.run_forever()
Я пробовал варианты использования сопрограмм, не используя их, обертывая вещи в Задачи, пытаясь заставить их создавать или возвращать фьючерсы и т.д.
Я начинаю думать, что имею неправильное представление о том, как я должен использовать asyncio (возможно, этот шаблон должен быть реализован по-другому, что я не знаю).
Любые указатели будут оценены.
Ответы
Ответ 1
Да, точно. Задачи - ваши друзья:
import asyncio, random
q = asyncio.Queue()
@asyncio.coroutine
def produce():
while True:
yield from q.put(random.random())
yield from asyncio.sleep(0.5 + random.random())
@asyncio.coroutine
def consume():
while True:
value = yield from q.get()
print("Consumed", value)
loop = asyncio.get_event_loop()
loop.create_task(produce())
loop.create_task(consume())
loop.run_forever()
asyncio.ensure_future
также можно использовать для создания задачи.
И имейте в виду: q.put()
является сопрограммой, поэтому вы должны использовать yield from q.put(value)
.
UPD
Переключено с asyncio.Task()
/asyncio.async()
на новый бренд API loop.create_task()
и asyncio.ensure_future()
в примере.
Ответ 2
Здесь то, что я использую в производстве, переместилось в gist: https://gist.github.com/thehesiod/7081ab165b9a0d4de2e07d321cc2391d
Ответ 3
Немного позже и, возможно, OT, имейте в виду, что вы можете потреблять из Queue
из нескольких задач, поскольку они были независимыми потребителями.
Следующий фрагмент показывает в качестве примера, как вы можете получить один и тот же шаблон пула потоков с задачами asyncio
.
q = asyncio.Queue()
async def sum(x):
await asyncio.sleep(0.1) # simulates asynchronously
return x
async def consumer(i):
print("Consumer {} started".format(i))
while True:
f, x = await q.get()
print("Consumer {} procesing {}".format(i, x))
r = await sum(x)
f.set_result(r)
async def producer():
consumers = [asyncio.ensure_future(consumer(i)) for i in range(5)]
loop = asyncio.get_event_loop()
tasks = [(asyncio.Future(), x) for x in range(10)]
for task in tasks:
await q.put(task)
# wait until all futures are completed
results = await asyncio.gather(*[f for f, _ in tasks])
assert results == [r for _, r in tasks]
# destroy tasks
for c in consumers:
c.cancel()
asyncio.get_event_loop().run_until_complete(producer())