Сельдерей, создающий новое соединение для каждой задачи
Я использую Celery с Redis для запуска некоторых фоновых задач, но каждый раз, когда вызывается задача, он создает новое соединение с Redis. Я нахожусь на Heroku, и мой план Redis to Go предусматривает 10 подключений. Я быстро нажимаю этот предел и получаю ошибку "максимальное количество клиентов".
Как я могу гарантировать, что Celery ставит задачи в одном соединении, а не открывает каждый новый?
EDIT - включая полную трассировку
File "/app/.heroku/venv/lib/python2.7/site-packages/django/core/handlers/base.py", line 111, in get_response
response = callback(request, *callback_args, **callback_kwargs)
File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/object_wrapper.py", line 166, in __call__
self._nr_instance, args, kwargs)
File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/hooks/framework_django.py", line 447, in wrapper
return wrapped(*args, **kwargs)
File "/app/.heroku/venv/lib/python2.7/site-packages/django/views/decorators/csrf.py", line 77, in wrapped_view
return view_func(*args, **kwargs)
File "/app/feedback/views.py", line 264, in zencoder_webhook_handler
tasks.process_zencoder_notification.delay(webhook)
File "/app/.heroku/venv/lib/python2.7/site-packages/celery/app/task.py", line 343, in delay
return self.apply_async(args, kwargs)
File "/app/.heroku/venv/lib/python2.7/site-packages/celery/app/task.py", line 458, in apply_async
with app.producer_or_acquire(producer) as P:
File "/usr/local/lib/python2.7/contextlib.py", line 17, in __enter__
return self.gen.next()
File "/app/.heroku/venv/lib/python2.7/site-packages/celery/app/base.py", line 247, in producer_or_acquire
with self.amqp.producer_pool.acquire(block=True) as producer:
File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 705, in acquire
R = self.prepare(R)
File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/pools.py", line 54, in prepare
p = p()
File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/pools.py", line 45, in <lambda>
return lambda: self.create_producer()
File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/pools.py", line 42, in create_producer
return self.Producer(self._acquire_connection())
File "/app/.heroku/venv/lib/python2.7/site-packages/celery/app/amqp.py", line 160, in __init__
super(TaskProducer, self).__init__(channel, exchange, *args, **kwargs)
File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/messaging.py", line 83, in __init__
self.revive(self.channel)
File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/messaging.py", line 174, in revive
channel = self.channel = maybe_channel(channel)
File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 879, in maybe_channel
return channel.default_channel
File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 617, in default_channel
self.connection
File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 610, in connection
self._connection = self._establish_connection()
File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 569, in _establish_connection
conn = self.transport.establish_connection()
File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/transport/virtual/__init__.py", line 722, in establish_connection
self._avail_channels.append(self.create_channel(self))
File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/transport/virtual/__init__.py", line 705, in create_channel
channel = self.Channel(connection)
File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/transport/redis.py", line 271, in __init__
self.client.info()
File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/object_wrapper.py", line 166, in __call__
self._nr_instance, args, kwargs)
File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/function_trace.py", line 81, in literal_wrapper
return wrapped(*args, **kwargs)
File "/app/.heroku/venv/lib/python2.7/site-packages/redis/client.py", line 344, in info
return self.execute_command('INFO')
File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/transport/redis.py", line 536, in execute_command
conn.send_command(*args)
File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 273, in send_command
self.send_packed_command(self.pack_command(*args))
File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 256, in send_packed_command
self.connect()
File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/object_wrapper.py", line 166, in __call__
self._nr_instance, args, kwargs)
File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/function_trace.py", line 81, in literal_wrapper
return wrapped(*args, **kwargs)
File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 207, in connect
self.on_connect()
File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 233, in on_connect
if self.read_response() != 'OK':
File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 283, in read_response
raise response
ResponseError: max number of clients reached
Ответы
Ответ 1
Я хотел бы использовать Redis, потому что существует определенная опция для ограничения количества подключений: CELERY_REDIS_MAX_CONNECTIONS
.
MongoDB
имеет аналогичную настройку.
Учитывая эти параметры бэкэнд, я понятия не имею, что делает BROKER_POOL_LIMIT
. Надеюсь, CELERY_REDIS_MAX_CONNECTIONS
решает вашу проблему.
Я один из тех людей, которые используют CloudAMQP, а бэкэнд AMQP не имеет собственного параметра ограничения соединения.
Ответ 2
Я столкнулся с той же проблемой на Heroku с CloudAMQP. Я не знаю почему, но мне не повезло при назначении низких целых чисел в настройке BROKER_POOL_LIMIT
.
В конечном итоге я обнаружил, что, установив BROKER_POOL_LIMIT=None
или BROKER_POOL_LIMIT=0
, моя проблема была смягчена. Согласно документам Celery, это отключает пул соединений. До сих пор это не было заметной проблемой для меня, однако я не уверен, может ли это быть для вас.
Ссылка на соответствующую информацию: http://celery.readthedocs.org/en/latest/configuration.html#broker-pool-limit
Ответ 3
Попробуйте выполнить следующие настройки:
CELERY_IGNORE_RESULT = True
CELERY_STORE_ERRORS_EVEN_IF_IGNORED = True
Ответ 4
У меня была аналогичная проблема, связанная с количеством соединений и Celery. Это было не на Героку, но это был Монго, а не Редис.
Я инициировал соединение вне определения функции задачи на уровне модуля задач. По крайней мере, для Mongo это позволило задачам разделить соединение.
Надеюсь, что это поможет.
https://github.com/instituteofdesign/wander/blob/master/wander/tasks.py
mongoengine.connect('stored_messages')
@celery.task(default_retry_delay = 61)
def pull(settings, google_settings, user, folder, messageid):
'''
Pulls a message from zimbra and stores it in Mongo
'''
try:
imap = imap_connect(settings, user)
imap.select(folder, True)
.......