Брокер In-Memory для тестов на сельдерей
У меня есть API REST, написанный на Django, с конечной точкой и конечной точкой, которая ставит задачу по сельдеру при отправке на нее. Ответ содержит идентификатор задачи, который я хотел бы использовать для проверки того, что задача создана и получить результат. Итак, я хотел бы сделать что-то вроде:
def test_async_job():
response = self.client.post("/api/jobs/", some_test_data, format="json")
task_id = response.data['task_id']
result = my_task.AsyncResult(task_id).get()
self.assertEquals(result, ...)
Я, очевидно, не хочу запускать работника сельдерея для запуска модульных тестов, я ожидаю, что он каким-то образом издевается над ним. Я не могу использовать CELERY_ALWAYS_EAGER, потому что это, похоже, вообще обходит брокера, не позволяя мне использовать AsyncResult, чтобы получить задание по его идентификатору (как указано здесь).
Пройдя через сельдерей и kombu docs, я обнаружил, что в модульных тестах есть транспорт в памяти, который будет делать то, что Я ищу. Я попытался переопределить параметр BROKER_URL
, чтобы использовать его в тестах:
@override_settings(BROKER_URL='memory://')
def test_async_job():
Но поведение такое же, как и с ampq broker: он блокирует тест, ожидающий результата. Любая идея, как я должен настроить этого брокера, чтобы заставить его работать в тестах?
Ответы
Ответ 1
Вы можете указать broker_backend в своих настройках:
if 'test' in sys.argv[1:]:
BROKER_BACKEND = 'memory'
CELERY_ALWAYS_EAGER = True
CELERY_EAGER_PROPAGATES_EXCEPTIONS = True
или вы можете переопределить настройки с помощью декоратора непосредственно в своем тесте
import unittest
from django.test.utils import override_settings
class MyTestCase(unittest.TestCase):
@override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
CELERY_ALWAYS_EAGER=True,
BROKER_BACKEND='memory')
def test_mytask(self):
...
Ответ 2
Вы можете использовать брокером в блоке Kombu для запуска модульных тестов, однако для этого вам нужно развернуть работника Celery, используя тот же объект приложения Celery, что и сервер Django.
Чтобы использовать брокер в памяти, установите BROKER_URL на memory://localhost/
Затем, чтобы развернуть маленького работника сельдерея, вы можете сделать следующее:
app = <Django Celery App>
# Set the worker up to run in-place instead of using a pool
app.conf.CELERYD_CONCURRENCY = 1
app.conf.CELERYD_POOL = 'solo'
# Code to start the worker
def run_worker():
app.worker_main()
# Create a thread and run the worker in it
import threading
t = threading.Thread(target=run_worker)
t.setDaemon(True)
t.start()
Вам нужно убедиться, что вы используете то же приложение, что и экземпляр приложения для фермеров Django.
Обратите внимание, что запуск работника будет печатать много вещей и изменять настройки ведения журнала.
Ответ 3
Здесь представлен более полнофункциональный пример Django TransactionTestCase
, который работает с Celery 4.x.
import threading
from django.test import TransactionTestCase
from django.db import connections
from myproj.celery import app # your Celery app
class CeleryTestCase(TransactionTestCase):
"""Test case with Celery support."""
@classmethod
def setUpClass(cls):
super().setUpClass()
app.control.purge()
cls._worker = app.Worker(app=app, pool='solo', concurrency=1)
connections.close_all()
cls._thread = threading.Thread(target=cls._worker.start)
cls._thread.daemon = True
cls._thread.start()
@classmethod
def tearDownClass(cls):
cls._worker.stop()
super().tearDownClass()
Помните, что это не изменяет ваши имена очередей на очереди тестирования, поэтому, если вы также запускаете приложение, вы тоже захотите сделать это.