GAE: тестовое задание с тестовым стендом
Я использую testbed для unit test моего приложения для приложений Google, а мое приложение использует задачу.
Когда я отправляю задачу на задание во время unit test, кажется, что задача находится в очереди, но задача не выполняется.
Как мне заставить задачу выполнить во время unit test?
Ответы
Ответ 1
Сервер приложений Dev однопоточен, поэтому он не может запускать задачи в фоновом режиме, в то время как поток переднего плана запускает тесты.
Я изменил TaskQueueTestCase в taskqueue.py в gaetestbed, чтобы добавить следующую функцию:
def execute_tasks(self, application):
"""
Executes all currently queued tasks, and also removes them from the
queue.
The tasks are execute against the provided web application.
"""
# Set up the application for webtest to use (resetting _app in case a
# different one has been used before).
self._app = None
self.APPLICATION = application
# Get all of the tasks, and then clear them.
tasks = self.get_tasks()
self.clear_task_queue()
# Run each of the tasks, checking that they succeeded.
for task in tasks:
response = self.post(task['url'], task['params'])
self.assertOK(response)
Для этого мне также пришлось изменить базовый класс TaskQueueTestCase от BaseTestCase до WebTestCase.
Мои тесты затем делают что-то вроде этого:
# Do something which enqueues a task.
# Check that a task was enqueued, then execute it.
self.assertTrue(len(self.get_tasks()), 1)
self.execute_tasks(some_module.application)
# Now test that the task did what was expected.
Таким образом, эта задача выполняется непосредственно с переднего плана unit test. Это не совсем то же самое, что и в производстве (т.е. Задача будет выполняться "через некоторое время" по отдельному запросу), но для меня это работает достаточно хорошо.
Ответ 2
Используя Saxon отличный ответ, я смог сделать то же самое, используя testbed вместо gaetestbed. Вот что я сделал.
Добавил это к моему setUp()
:
self.taskqueue_stub = apiproxy_stub_map.apiproxy.GetStub('taskqueue')
Затем в моем тесте я использовал следующее:
# Execute the task in the taskqueue
tasks = self.taskqueue_stub.GetTasks("default")
self.assertEqual(len(tasks), 1)
task = tasks[0]
params = base64.b64decode(task["body"])
response = self.app.post(task["url"], params)
Где-то вдоль линии параметры POST получают base64, поэтому пришлось отменить это, чтобы заставить его работать.
Мне нравится это лучше, чем саксонский ответ, поскольку я могу использовать официальный тестовый пакет, и я могу сделать все это в своем собственном тестовом коде.
EDIT: позже я хотел сделать то же самое с задачами, представленными с использованием отложенной библиотеки, и потребовалось немного поворота, чтобы понять это, поэтому я делюсь здесь, чтобы облегчить боль другим людям.
Если ваше задание содержит только задания, отправленные с отсрочкой, тогда будут выполняться все задачи и задачи, поставленные в очередь этими задачами:
def submit_deferred(taskq):
tasks = taskq.GetTasks("default")
taskq.FlushQueue("default")
while tasks:
for task in tasks:
(func, args, opts) = pickle.loads(base64.b64decode(task["body"]))
func(*args)
tasks = taskq.GetTasks("default")
taskq.FlushQueue("default")
Ответ 3
Другим (более чистым) вариантом для этого является использование заглушки очереди задач в тестовом стенде. Для этого сначала нужно инициализировать заглушку очереди задач, добавив следующее к вашему методу setUp()
:
self.testbed = init_testbed()
self.testbed.init_taskqueue_stub()
Доступ к планировщику задач можно получить с помощью следующего кода:
taskq = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME)
Интерфейс для работы с заглушкой очереди выглядит следующим образом:
GetQueues() #returns a list of dictionaries with information about the available queues
#returns a list of dictionaries with information about the tasks in a given queue
GetTasks(queue_name)
DeleteTask(queue_name, task_name) #removes the task task_name from the given queue
FlushQueue(queue_name) #removes all the tasks from the queue
#returns tasks filtered by name & url pointed to by the task from the given queues
get_filtered_tasks(url, name, queue_names)
StartBackgroundExecution() #Executes the queued tasks
Shutdown() #Requests the task scheduler to shutdown.
Кроме того, поскольку это использует собственные средства App Engine SDK - он отлично работает с отложенной библиотекой.
Ответ 4
Возможно, вы захотите попробовать следующий код. Полное объяснение здесь: http://www.geewax.org/task-queue-support-in-app-engines-ext-testbed/
import unittest
from google.appengine.api import taskqueue
from google.appengine.ext import testbed
class TaskQueueTestCase(unittest.TestCase):
def setUp(self):
self.testbed = testbed.Testbed()
self.testbed.activate()
self.testbed.init_taskqueue_stub()
self.task_queue_stub = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME)
def tearDown(self):
self.testbed.deactivate()
def testTaskAdded(self):
taskqueue.add(url='/path/to/task')
tasks = self.taskqueue_stub.get_filtered_tasks(url='/path/to/task')
self.assertEqual(1, len(tasks))
self.assertEqual('/path/to/task', tasks[0].url)
unittest.main()