Планировщик воздушного потока медленнее планировать последующие задачи

Когда я пытаюсь запустить DAG в Airflow 1.8.0, я нахожу, что это занимает много времени между временем выполнения задачи предшественника и временем, когда задача преемника подбирается для выполнения (обычно это увеличивает время выполнения отдельных задач). То же самое и сценарий для последовательных, локальных и сельдерейских исполнителей. Есть ли способ уменьшить упомянутое выше накладное время? (как и любые параметры в airflow.cfg, которые могут ускорить выполнение DAG?) Гантальная диаграмма была добавлена для справки: Gantt chart

Ответы

Ответ 1

Как сказал Ник, Airflow - это не инструмент реального времени. Задачи планируются и выполняются как можно скорее, но следующая задача никогда не будет запущена сразу после последней.

Если у вас есть более ~ 100 групп DAG с ~ 3 задачами в каждой или Dags с множеством задач (~ 100 или более), вы должны учитывать 3 вещи:

  1. Увеличьте количество потоков, которые DagFileProcessorManager будет использовать для загрузки и выполнения Dags (airflow.cfg):

[scheduler]

max_threads = 2

Параметр max_threads управляет количеством DAG, которые выбираются, исполняются/закрываются (см. Здесь).

Увеличение этой конфигурации может сократить время между заданиями.

  1. Контролируйте свою базу данных Airflow, чтобы увидеть, есть ли в ней узкие места. База данных Airflow используется для управления и выполнения процессов:

Недавно мы страдали от той же проблемы. Время между заданиями составляло ~ 10-15 минут, мы использовали PostgreSQL на AWS.

Экземпляр не очень хорошо использовал ресурсы; ~ 20 операций ввода-вывода в секунду, 20% памяти и ~ 10% процессора, но поток воздуха был очень медленным.

Изучив производительность базы данных с помощью PgHero, мы обнаружили, что даже запрос, использующий индекс для небольшой таблицы, тратит более одной секунды.

Таким образом, мы увеличили размер базы данных, и теперь Airflow работает так же быстро, как ракета. :)

  1. Чтобы узнать время, которое Airflow тратит на загрузку Dags, выполните команду:

airflow list_dags -r

DagBag parsing time: 7.9497220000000075

Если время разбора DagBag превышает ~ 5 минут, это может быть проблемой.

Все это помогло нам запустить Airflow быстрее. Я действительно советую вам обновиться до версии 1.9, так как в этой версии исправлено много проблем с производительностью

Кстати, мы используем Airflow master в производстве, с LocalExecutor и PostgreSQL в качестве базы данных метаданных.

Ответ 2

Ваша диаграмма Ганта показывает вещи в порядке секунд. Воздушный поток не предназначен для работы в режиме реального времени. Он имеет дело с порядком минут. Если вам нужно что-то ускорить, вы можете рассмотреть другой инструмент планирования из воздушного потока. В качестве альтернативы вы можете поместить всю работу в одну задачу, чтобы не страдать от задержки планировщика.

Ответ 3

Я должен был исправить код заполнения дага, потому что каждый рабочий потратил более 30 секунд, заполнив сумку. Проблема связана с кодом models.py detect_downstream_cycle, который занимает много времени. В моем тестировании с использованием команды list_dags здесь приведены мои результаты: