Сельдерей - функция вызова по заданию
Я использую сельдерей с django и rabbitmq для создания очереди сообщений. У меня также есть рабочий, который происходит с другой машины. В представлении django я начинаю такой процесс:
def processtask(request, name):
args = ["ls", "-l"]
MyTask.delay(args)
return HttpResponse("Task set to execute.")
Моя задача настроена следующим образом:
class MyTask(Task):
def run(self, args):
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(out, err) = p.communicate()
return out
Теперь мой вопрос заключается в том, как теперь брокер (мой проект django) получает результат от команды "ls -l", которую рабочий выполнил на своем компьютере. Я думаю, лучше всего, чтобы работник мог вызвать функцию в брокере, когда он готов отправить результат из выполненной команды.
Я хотел бы получать результат от асинхронного пользователя, а затем обновлять веб-страницу с помощью вывода, но это в другое время. На данный момент я хотел бы получить только результат от работника.
Обновление
В настоящий момент я добавил HTTP-запрос GET, который запускается в конце задачи, уведомляя веб-приложение о том, что задача выполнена - я также отправляю task_id в http GET. Метод http GET вызывает представление django, которое создает AsyncResult и получает результат, но проблема в том, что при вызове result.get() появляется следующая ошибка:
/usr/lib64/python2.6/site-packages/django_celery-2.5.1-py2.6.egg/djcelery/managers.py:178: TxIsolationWarning: Polling results with transaction isolation level repeatable-read within the same transaction may give outdated results. Be sure to commit the transaction for each poll iteration.
"Polling results with transaction isolation level"
Любые идеи, почему? Я не использую базу данных, потому что я использую rabbitmq с AMQP.
Update.
Я бы очень хотел использовать третий вариант, который кажется лучшим вариантом - для небольших и больших возвращаемых значений. Вся моя задача выглядит так:
class MyTask(Task):
def __call__(self, *args, **kwargs):
return self.run(*args, **kwargs)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
if self.webhost is not None:
conn = httplib.HTTPConnection(self.webhost, self.webport)
conn.request("HEAD", "/vuln/task/output/"+task_id)
def run(self, args, webhost=None, webport=None):
self.webhost = webhost
self.webport = webport
r = "This is a basic result string used for code clarity"
return r
Итак, я переопределил функцию after_return, которая также должна освободить блокировку моей задачи, так как функция run() функции уже вернула значение. В запросе HEAD я в основном вызываю функцию django, которая вызывает AsyncResult на task_id, которая должна обеспечивать результат задачи. Я использовал произвольный результат для тестирования в моем случае, поскольку он предназначен только для тестирования.
Я хотел бы знать, почему приведенный выше код не работает. Я могу использовать on_success, но я не думаю, что это будет иметь значение - или это будет?
Ответы
Ответ 1
Если вы посмотрите здесь, вы найдете следующее:
Django-celery использует MySQL для отслеживания всех задач/результатов, кролик-mq используется как коммуникационная шина в основном.
Что действительно происходит, так это то, что вы пытаетесь получить ASyncResult
рабочего, пока задача все еще запущена (задача вызвала HTTP-запрос на ваш сервер и поскольку он еще не вернулся, сеанс блокировки db от рабочего все еще активен, а строка результатов все еще заблокирована). Когда Django пытается прочитать результат задачи (его состояние и фактическое возвращаемое значение функции запуска), он обнаруживает, что строка заблокирована и выдает предупреждение.
Есть несколько способов решить эту проблему:
-
Задайте еще одну задачу сельдерея, чтобы получить результат и связать его с задачей обработки. Таким образом, исходная задача завершится, отпустите блокировку на db, а новая получит ее, прочитает результат в django и сделает все, что вам нужно. Посмотрите на документы сельдерея на этом.
-
Не беспокойтесь и просто делайте POST в Django с полным результатом обработки, прикрепленным как полезную нагрузку, вместо того, чтобы пытаться извлечь его через db.
-
Переопределите on_success в своем классе задач и отправьте запрос на уведомление в Django, после чего блокировка должна быть выпущена в таблице db.
Обратите внимание, что вам нужно сохранить весь результат обработки (независимо от того, насколько он велико) в возврате метода run (возможно, маринованного). Вы не указали, насколько велика может быть результат, поэтому может иметь смысл фактически сделать сценарий № 2 выше (что я и сделаю). В качестве альтернативы я бы пошел с №3. Также не забывайте обрабатывать метод on_failure, а также в своей задаче.