Прочитать файл по строкам с помощью asyncio
Я хочу прочитать несколько файлов журнала, когда они написаны, и обрабатывать их ввод с помощью asyncio. Код должен запускаться в окнах. Из того, что я понимаю из поиска как stackoverflow, так и Интернета, асинхронный ввод-вывод файлов сложен для большинства операционных систем (select
не будет работать, как предполагалось, например). Хотя я уверен, что могу сделать это с помощью других методов (например, потоков), я бы попробовал асинхронно, чтобы понять, что это такое. Наиболее полезным ответом, вероятно, будет тот, который описывает, как должна выглядеть "архитектура" решения этой проблемы, т.е. Как вызывать или планировать различные функции и сопрограммы.
Ниже приведен генератор, который читает файлы по строкам (через опрос, который является приемлемым):
import time
def line_reader(f):
while True:
line = f.readline()
if not line:
time.sleep(POLL_INTERVAL)
continue
process_line(line)
С несколькими файлами для мониторинга и обработки этот тип кода потребует потоков. Я немного изменил его, чтобы использовать его в asyncio:
import asyncio
def line_reader(f):
while True:
line = f.readline()
if not line:
yield from asyncio.sleep(POLL_INTERVAL)
continue
process_line(line)
Этот вид работает, когда я планирую его через цикл событий asyncio, но если process_data
блокирует, то это, конечно, не очень хорошо. Когда я начинал, я думал, что решение будет выглядеть примерно так:
def process_data():
...
while True:
...
line = yield from line_reader()
...
но я не мог понять, как сделать эту работу (по крайней мере, не без process_data
управления довольно немного состояния).
Любые идеи о том, как я должен структурировать такой код?
Ответы
Ответ 1
Из того, что я понимаю из поиска как в stackoverflow, так и в Интернете, асинхронный ввод-вывод файлов сложен для большинства операционных систем (выбор не будет работать, как предполагалось, например). Хотя я уверен, что могу сделать это с помощью других методов (например, потоков), я бы попробовал асинхронно, чтобы понять, что это такое.
asyncio
является select
на основе * nix систем под капотом, поэтому вы не сможете делать неблокирующий файловый ввод-вывод без использования потоков. В Windows asyncio
может использовать IOCP, который поддерживает неблокирующий ввод-вывод файлов, но это не поддерживается asyncio
.
Ваш код в порядке, за исключением того, что вы должны блокировать вызовы ввода-вывода в потоках, чтобы вы не блокировали цикл событий, если ввод-вывод работает медленно. К счастью, очень просто отключить загрузку работы с потоками, используя функцию loop.run_in_executor
.
Сначала настройте выделенный пул потоков для ввода/вывода:
from concurrent.futures import ThreadPoolExecutor
io_pool_exc = ThreadPoolExecutor()
И затем просто выгрузите любые блокирующие вызовы ввода-вывода исполнителю:
...
line = yield from loop.run_in_executor(io_pool_exc, f.readline)
...
Ответ 2
Используя aiofiles:
async with aiofiles.open('filename', mode='r') as f:
async for line in f:
print(line)
РЕДАКТИРОВАТЬ 1
Как упоминалось в @Jashandeep, вы должны заботиться об операциях блокировки:
Другой метод: select
и epoll
:
from select import select
files_to_read, files_to_write, exceptions = select([f1, f2], [f1, f2], [f1, f2], timeout=.1)
Здесь важен параметр timeout
.
см. https://docs.python.org/2/library/select.html#select.select
РЕДАКТИРОВАТЬ 2
Вы можете зарегистрировать файл для чтения/записи с помощью: loop.add_reader()
Ответ 3
Ваша структура кода выглядит хорошо для меня, следующий код отлично работает на моей машине:
import asyncio
PERIOD = 0.5
@asyncio.coroutine
def readline(f):
while True:
data = f.readline()
if data:
return data
yield from asyncio.sleep(PERIOD)
@asyncio.coroutine
def test():
with open('test.txt') as f:
while True:
line = yield from readline(f)
print('Got: {!r}'.format(line))
loop = asyncio.get_event_loop()
loop.run_until_complete(test())
Ответ 4
asyncio
пока не поддерживает файловые операции, извините.
Таким образом, это не может помочь с вашей проблемой.