Многопоточность внутри работника сельдерея
Я использую Celery с RabbitMQ для обработки данных из запросов API. Процесс выполняется следующим образом:
Запрос → API → RabbitMQ → Работник сельдерея → Возврат
В идеале я бы породил больше работников сельдерея, но меня ограничивают ограничения памяти.
В настоящее время узким местом в моем процессе является выборка и загрузка данных из URL-адресов, переданных в рабочий. Roughy, процесс выглядит следующим образом:
celery_gets_job(url):
var data = fetches_url(url) # takes 0.1s to 1.0s (bottleneck)
var result = processes_data(data) # takes 0.1ss
return result
Это неприемлемо, так как работник некоторое время блокируется при получении URL-адреса. Я смотрю на улучшение этого путем потоковой передачи, но я не уверен, что лучшие методы:
-
Есть ли способ заставить сельдерей загружать входящие данные асинхронно, одновременно обрабатывая данные в другом потоке?
-
Должен ли я иметь отдельных рабочих, получающих и обрабатывающих, с некоторой формой передачи сообщений, возможно через RabbitMQ?
Ответы
Ответ 1
Используя библиотеку eventlet
, вы можете исправлять стандартные библиотеки для их асинхронности.
Сначала импортируйте async urllib2:
from eventlet.green import urllib2
Итак, вы получите тело url:
def fetch(url):
body = urllib2.urlopen(url).read()
return body
Подробнее eventlet
примеры здесь.
Ответ 2
Я бы создал две задачи: одну для загрузки данных, а другую для ее обработки после ее загрузки. Таким образом, вы можете масштабировать две задачи самостоятельно. Смотрите: Routing, Chains.