Многопоточность для Python Django
Некоторые функции должны запускаться асинхронно на веб-сервере. Отправка электронных писем или пост-обработка данных являются типичными случаями использования.
Каков наилучший (или самый питонический) способ написать функцию декоратора для асинхронного запуска функции?
Моя установка обычная: Python, Django, Gunicorn или Waitress, стандарт AWS EC2 Linux
Например, вот начало:
from threading import Thread
def postpone(function):
def decorator(*args, **kwargs):
t = Thread(target = function, args=args, kwargs=kwargs)
t.daemon = True
t.start()
return decorator
желаемое использование:
@postpone
def foo():
pass #do stuff
Ответы
Ответ 1
Я продолжил использовать эту реализацию в масштабе и в производстве без проблем.
Определение декоратора:
def start_new_thread(function):
def decorator(*args, **kwargs):
t = Thread(target = function, args=args, kwargs=kwargs)
t.daemon = True
t.start()
return decorator
Пример использования:
@start_new_thread
def foo():
#do stuff
Со временем стек обновился и перешел без сбоев.
Первоначально Python 2.4.7, Django 1.4, Gunicorn 0.17.2, теперь Python 3.6, Django 2.1, Waitress 1.1.
Если вы используете какие-либо транзакции базы данных, Django создаст новое соединение, и его необходимо закрыть вручную:
from django.db import connection
@postpone
def foo():
#do stuff
connection.close()
Ответ 2
Celery является асинхронной очередью очереди задач/заданий. Это хорошо документировано и идеально подходит для вас. Я предлагаю вам начать здесь
Ответ 3
Наиболее распространенным способом обработки асинхронной обработки в Django является использование Celery и django-celery
.
Ответ 4
Подход TomCounsell работает хорошо, если не слишком много входящих рабочих мест. Если много длительных заданий выполняются за короткий промежуток времени, что порождает много потоков, основной процесс пострадает. В этом случае вы можете использовать пул потоков с сопрограммой,
# in my_utils.py
from concurrent.futures import ThreadPoolExecutor
MAX_THREADS = 10
def run_thread_pool():
"""
Note that this is not a normal function, but a coroutine.
All jobs are enqueued first before executed and there can be
no more than 10 threads that run at any time point.
"""
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
while True:
func, args, kwargs = yield
executor.submit(func, *args, **kwargs)
pool_wrapper = run_thread_pool()
# Advance the coroutine to the first yield (priming)
next(pool_wrapper)
from my_utils import pool_wrapper
def job(*args, **kwargs):
# do something
def handle(request):
# make args and kwargs
pool_wrapper.send((job, args, kwargs))
# return a response
Ответ 5
GET/api/mx_catalog/v1/ismyagenda/? Content_id = 1 HTTP 404 не найден Тип содержимого: application/json Варьировать: Принять Разрешить: ОПЦИИ, GET
{"status": false} GET/api/mx_catalog/v1/ismyagenda/? content_id = 1 HTTP 404 Не найдено Тип содержимого: приложение /json Зависит: Принять, разрешить: OPTIONS, GET
{"status": false}