Как выполнять периодическую задачу с Flask в Python
Я использовал Flask, чтобы предоставить простой веб-API для моего k8055 интерфейса USB доска; довольно стандартные геттеры и клюшки, а Flask действительно облегчили мою жизнь.
Но я хочу иметь возможность регистрировать изменения состояния как/рядом, когда происходит сыворотка.
Например, если у меня есть кнопка, подключенная к плате, я могу опросить api для этого конкретного порта. Но если бы я хотел, чтобы результаты напрямую отражали результаты, независимо от того, говорил ли кто-нибудь с api, у меня было бы что-то вроде этого.
while True:
board.read()
board.digital_outputs = board.digital_inputs
board.read()
time.sleep(1)
И каждую секунду выходы будут обновляться в соответствии с входами.
Есть ли способ сделать это под фляк? Я делал подобные вещи в Twisted, но Flask слишком удобен для этого конкретного приложения, чтобы отказаться от него еще...
Спасибо.
Ответы
Ответ 1
Вы можете использовать cron для простых задач.
Создайте представление фляжки для своей задачи.
# a separate view for periodic task
@app.route('/task')
def task():
board.read()
board.digital_outputs = board.digital_inputs
Затем, используя cron, периодически загружайте с этого URL
# cron task to run each minute
0-59 * * * * run_task.sh
Если содержимое run_task.sh
wget http://localhost/task
Cron не может работать чаще, чем один раз в минуту. Если вам нужна более высокая частота (скажем, каждые 5 секунд = 12 раз в минуту), вы должны сделать это в tun_task.sh следующим образом
# loop 12 times with a delay
for i in 1 2 3 4 5 6 7 8 9 10 11 12
do
# download url in background for not to affect delay interval much
wget -b http://localhost/task
sleep 5s
done
Ответ 2
В моем приложении Flask я рассмотрел использование подхода cron, описанного Пашкой в его , schedule и APScheduler.
Я обнаружил, что APScheduler прост и служит для задачи периодического запуска задачи, поэтому продолжил работу с APScheduler.
Пример кода:
from flask import Flask, jsonify
from apscheduler.schedulers.background import BackgroundScheduler
app = Flask(__name__)
def test_job():
print('I am working...')
scheduler = BackgroundScheduler()
job = scheduler.add_job(test_job, 'interval', minutes=1)
scheduler.start()
Ответ 3
Нет поддержки задач в Flask, но вы можете использовать flask-celery или просто запустить свою функцию в отдельном потоке (greenlet).