Задачи Airflow застревают в статусе "queued" и никогда не запускаются
Я использую Airflow v1.8.1 и запускаю все компоненты (рабочий, веб, цветок, планировщик) на кубернетах и докере.
Я использую Celery Executor с Redis, и мои задачи выглядят так:
(start) -> (do_work_for_product1)
├ -> (do_work_for_product2)
├ -> (do_work_for_product3)
├ …
Итак, задача start
имеет несколько нисходящих потоков.
И я настраиваю конфигурацию concurrency, как показано ниже:
parallelism = 3
dag_concurrency = 3
max_active_runs = 1
Затем, когда я запускаю эту DAG вручную (не уверен, что это никогда не происходит в запланированной задаче), некоторые нисходящие потоки выполняются, а другие застревают в статусе "queued".
Если я очистил задачу от пользовательского интерфейса администратора, она будет выполнена.
Нет рабочего журнала (после обработки некоторых первых потоков, он просто не выводит ни одного журнала).
Журнал веб-сервера (не уверен worker exiting
)
/usr/local/lib/python2.7/dist-packages/flask/exthook.py:71: ExtDeprecationWarning: Importing flask.ext.cache is deprecated, use flask_cache instead.
.format(x=modname), ExtDeprecationWarning
[2017-08-24 04:20:56,496] [51] {models.py:168} INFO - Filling up the DagBag from /usr/local/airflow_dags
[2017-08-24 04:20:57 +0000] [27] [INFO] Handling signal: ttou
[2017-08-24 04:20:57 +0000] [37] [INFO] Worker exiting (pid: 37)
Также нет планировщика ошибок в планировщике. И ряд задач застревает, меняется каждый раз, когда я пытаюсь это сделать.
Потому что я также использую Docker. Мне интересно, связано ли это:
https://github.com/puckel/docker-airflow/issues/94
Но пока нет подсказок.
Кто-нибудь сталкивался с подобной проблемой или имел некоторое представление о том, что я могу расследовать по этой проблеме...?
Ответы
Ответ 1
Задачи, застревающие, скорее всего, ошибка. В настоящий момент (< = 1,9,0alpha1) это может произойти, когда задача не может даже запускаться на (удаленном) рабочем месте. Это происходит, например, в случае перегруженного рабочего или отсутствующих зависимостей.
Этот патч должен решить эту проблему.
Стоит исследовать, почему ваши задачи не получают состояния RUNNING. Установка себя на это состояние - это, во-первых, задача. Обычно рабочий регистрируется до его запуска, а также отчеты и ошибки. Вы должны быть в состоянии найти записи этого в журнале задач.
edit. Как уже упоминалось в комментариях к исходному вопросу, если один пример воздушного потока, который не может выполнить задачу, - это когда он не может писать в требуемые места. Это делает его невозможным, и задачи застряли. Патч исправляет это, не выполнив задачу из планировщика.
Ответ 2
У нас есть решение и мы хотим поделиться им здесь до того, как 1,9 станет официальным. Спасибо за обновления Bolke de Bruin 1.9. в моей ситуации до 1,9, в настоящее время мы используем 1.8.1, чтобы иметь еще одну DAG, выполняющую очистку задачи в queue state
, если она остается там более 30 минут.