Как использовать Flask-SQLAlchemy в задаче Сельдерея
Недавно я переключился на Celery 3.0. До этого я использовал Flask-Celery, чтобы интегрировать Celery with Flask. Хотя у него было много проблем, таких как скрытие некоторых мощных функциональных возможностей сельдерея, но это позволило мне использовать полный контекст приложения Flask и особенно Flask-SQLAlchemy.
В моих фоновых задачах я обрабатываю данные и ORM SQLAlchemy для хранения данных. Составитель Flask-Celery отказался от поддержки плагина. Плагин собирал экземпляр Flask в задаче, поэтому я мог иметь полный доступ к SQLAlchemy.
Я пытаюсь воспроизвести это поведение в файле tasks.py, но без успеха. Есть ли у вас какие-либо намеки на то, как достичь этого?
Ответы
Ответ 1
Обновление. С тех пор мы начали использовать лучший способ справиться с отладкой приложения и настроить его для каждой задачи на основе шаблона, описанного в более поздняя документация фляг.
extensions.py
import flask
from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery
class FlaskCelery(Celery):
def __init__(self, *args, **kwargs):
super(FlaskCelery, self).__init__(*args, **kwargs)
self.patch_task()
if 'app' in kwargs:
self.init_app(kwargs['app'])
def patch_task(self):
TaskBase = self.Task
_celery = self
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
if flask.has_app_context():
return TaskBase.__call__(self, *args, **kwargs)
else:
with _celery.app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
self.Task = ContextTask
def init_app(self, app):
self.app = app
self.config_from_object(app.config)
celery = FlaskCelery()
db = SQLAlchemy()
app.py
from flask import Flask
from extensions import celery, db
def create_app():
app = Flask()
#configure/initialize all your extensions
db.init_app(app)
celery.init_app(app)
return app
После того, как вы настроили свое приложение таким образом, вы можете запускать и использовать сельдерей без необходимости явно запускать его из контекста приложения, так как все ваши задачи будут автоматически запускаться в контексте приложения, если это необходимо, и вы не не нужно явно беспокоиться о разрыве после задачи, что является важной проблемой для управления (см. другие ответы ниже).
Старый ответ ниже, все еще работает, но не так, как чистое решение
Я предпочитаю запускать весь сельдерей в контексте приложения, создавая отдельный файл, который вызывает celery.start() с контекстом приложения. Это означает, что ваш файл задач не должен быть усеян настройкой контекста и разрывами. Он также хорошо подходит к шаблону "приложение factory".
extensions.py
from from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery
db = SQLAlchemy()
celery = Celery()
tasks.py
from extensions import celery, db
from flask.globals import current_app
from celery.signals import task_postrun
@celery.task
def do_some_stuff():
current_app.logger.info("I have the application context")
#you can now use the db object from extensions
@task_postrun.connect
def close_session(*args, **kwargs):
# Flask SQLAlchemy will automatically create new sessions for you from
# a scoped session factory, given that we are maintaining the same app
# context, this ensures tasks have a fresh session (e.g. session errors
# won't propagate across tasks)
db.session.remove()
app.py
from extensions import celery, db
def create_app():
app = Flask()
#configure/initialize all your extensions
db.init_app(app)
celery.config_from_object(app.config)
return app
RunCelery.py
from app import create_app
from extensions import celery
app = create_app()
if __name__ == '__main__':
with app.app_context():
celery.start()
Ответ 2
В файле tasks.py выполните следующие действия:
from main import create_app
app = create_app()
celery = Celery(__name__)
celery.add_defaults(lambda: app.config)
@celery.task
def create_facet(project_id, **kwargs):
with app.test_request_context():
# your code
Ответ 3
Я использовал ответ Павла Гиббса с двумя отличиями. Вместо task_postrun я использовал worker_process_init. И вместо .remove() я использовал db.session.expire_all().
Я не уверен на 100%, но из того, что я понимаю, как это работает, когда Celery создает рабочий процесс, все унаследованные/разделяемые сеансы db будут истекли, а SQLAlchemy создаст новые сеансы по требованию уникальные к этому рабочему процессу.
До сих пор, похоже, я исправил свою проблему. С решением Пола, когда один рабочий закончил и удалил сеанс, другой рабочий, использующий тот же сеанс, все еще выполнял свой запрос, поэтому db.session.remove() закрыл соединение во время его использования, предоставив мне "Потерянное соединение с MySQL сервер во время запроса".
Спасибо Павлу за то, что он меня направил в правильном направлении!
Никогда не думал, что это не сработало. Я закончил с аргументом в моем приложении Flask factory, чтобы не запускать db.init_app (приложение), если его назвал Селььель. Вместо этого рабочие назовут его после того, как сельдерей разбудит их. Теперь я вижу несколько соединений в моем списке процессов MySQL.
from extensions import db
from celery.signals import worker_process_init
from flask import current_app
@worker_process_init.connect
def celery_worker_init_db(**_):
db.init_app(current_app)
Ответ 4
from flask import Flask
from werkzeug.utils import import_string
from celery.signals import worker_process_init, celeryd_init
from flask_celery import Celery
from src.app import config_from_env, create_app
celery = Celery()
def get_celery_conf():
config = import_string('src.settings')
config = {k: getattr(config, k) for k in dir(config) if k.isupper()}
config['BROKER_URL'] = config['CELERY_BROKER_URL']
return config
@celeryd_init.connect
def init_celeryd(conf=None, **kwargs):
conf.update(get_celery_conf())
@worker_process_init.connect
def init_celery_flask_app(**kwargs):
app = create_app()
app.app_context().push()
- Обновить конфигурацию сельдерея в celeryd init
- Используйте флеш-приложение factory для инициализации всех расширений фляж, включая расширение SQLAlchemy.
Таким образом, мы можем поддерживать подключение к базе данных для каждого сотрудника.
Если вы хотите запустить задачу под флагом, вы можете подклассом Task.__call__
:
class SmartTask(Task):
abstract = True
def __call__(self, *_args, **_kwargs):
with self.app.flask_app.app_context():
with self.app.flask_app.test_request_context():
result = super(SmartTask, self).__call__(*_args, **_kwargs)
return result
class SmartCelery(Celery):
def init_app(self, app):
super(SmartCelery, self).init_app(app)
self.Task = SmartTask