Использование Popen в потоке блокирует каждый входящий запрос Flask-SocketIO

У меня следующая ситуация: Я получаю запрос на сервере socketio. Я отвечаю на него (socket.emit(..)) и , затем запускает что-то с тяжелой загрузкой вычислений в другой поток.

Если тяжелое вычисление вызвано subprocess.Popen (используя subprocess.PIPE), он полностью блокирует каждый входящий запрос до тех пор, пока он выполняется, хотя это происходит в отдельном потоке.

Нет проблем - в этот поток было предложено асинхронно считывать результат подпроцесса с размер буфера 1, так что между этими чтениями другие потоки имеют возможность что-то сделать. К сожалению, это не помогло мне.

Я также уже monkeypatched eventlet, и это отлично работает, если я не использую subprocess.Popen с subprocess.PIPE в потоке.

В этом примере кода вы можете видеть, что это происходит только с помощью subprocess.Popen с subprocess.PIPE. При раскомментировании #functionWithSimulatedHeavyLoad() и вместо этого комментарий functionWithHeavyLoad() все работает как шарм.

from flask import Flask
from flask.ext.socketio import SocketIO, emit
import eventlet

eventlet.monkey_patch()
app = Flask(__name__)
socketio = SocketIO(app)

import time
from threading  import Thread

@socketio.on('client command')
def response(data, type = None, nonce = None):
    socketio.emit('client response', ['foo'])
    thread = Thread(target = testThreadFunction)
    thread.daemon = True
    thread.start()

def testThreadFunction():
    #functionWithSimulatedHeavyLoad()
    functionWithHeavyLoad()

def functionWithSimulatedHeavyLoad():
    time.sleep(5)

def functionWithHeavyLoad():
    from datetime import datetime
    import subprocess
    import sys
    from queue import Queue, Empty

    ON_POSIX = 'posix' in sys.builtin_module_names

    def enqueueOutput(out, queue):
        for line in iter(out.readline, b''):
            if line == '':
                break
            queue.put(line)
        out.close()

    # just anything that takes long to be computed
    shellCommand = 'find / test'

    p = subprocess.Popen(shellCommand, universal_newlines=True, shell=True, stdout=subprocess.PIPE, bufsize=1, close_fds=ON_POSIX)
    q = Queue()
    t = Thread(target = enqueueOutput, args = (p.stdout, q))
    t.daemon = True
    t.start()
    t.join()

    text = ''

    while True:
        try:
            line = q.get_nowait()
            text += line
            print(line)
        except Empty:
            break

    socketio.emit('client response', {'text': text})

socketio.run(app)

Клиент получает сообщение "foo" после завершения функции блокировки функции functionWithHeavyLoad(). Однако он должен получать сообщение ранее.

Этот образец может быть скопирован и вставлен в .py файл, и поведение можно мгновенно воспроизвести.

Я использую Python 3.4.3, Flask 0.10.1, flask-socketio1.2, eventlet 0.17.4

Обновление

Если я поместил это в функцию functionWithHeavyLoad, он действительно работает и все отлично:

import shlex
shellCommand = shlex.split('find / test')

popen = subprocess.Popen(shellCommand, stdout=subprocess.PIPE)

lines_iterator = iter(popen.stdout.readline, b"")
for line in lines_iterator:
    print(line)
    eventlet.sleep()

Проблема заключается в следующем: я использовал find для большой нагрузки, чтобы сделать образец более легко воспроизводимым. Однако в моем коде я использую tesseract "{0}" stdout -l deu как команду sell. Это (в отличие от find) все еще блокирует все. Является ли это скорее проблемой tesseract, чем eventlet? Но все же: как это может произойти, если это происходит в отдельном потоке, где он читает строки за строкой с помощью контекстного переключателя, когда find не блокирует?

Ответы

Ответ 1

Благодаря этому вопросу я узнал что-то новое сегодня. Eventlet предлагает версию подпроцесса и ее функции, совместимую с "зеленым", но по какой-то нечетной причине он не обещает запланировать этот модуль в стандартной библиотеке.

Ссылка на реализацию подпроцесса eventlet: https://github.com/eventlet/eventlet/blob/master/eventlet/green/subprocess.py

Глядя на eventlet patcher, модули, которые исправлены, - os, select, socket, thread, time, MySQLdb, builtins и psycopg2. В патчере нет никакой ссылки на подпроцесс.

Хорошей новостью является то, что я смог работать с Popen() в приложении, очень похожем на ваше, после того, как я заменил:

import subprocess

с:

from eventlet.green import subprocess

Но обратите внимание, что выпущенная в данный момент версия eventlet (0.17.4) не поддерживает параметр universal_newlines в Popen, вы получите сообщение об ошибке, если вы его используете. Поддержка этой опции находится в главном (здесь commit, который добавил эту опцию). Вам либо придется удалить эту опцию из вашего вызова, либо установить основную ветвь eventlet непосредственно из github.