Возможно ли, чтобы планировщик Airflow сначала завершил цикл предыдущего дня, прежде чем начать следующий?
Прямо сейчас узлы в моей группе обеспечения доступности баз данных переходят к задаче следующего дня, прежде чем остальные узлы этой группы обеспечения доступности баз данных завершают свою работу. Есть ли способ дождаться завершения остальной части группы DAG, прежде чем перейти к следующему циклу DAG?
(У меня есть завихрение зависимости от_on_past, но в этом случае это не работает)
Мой DAG выглядит так:
O
l
V
O -> O -> O -> O -> O
Кроме того, изображение дерева в виде даг]
![tree view pic of the dag]()
Ответы
Ответ 1
Возможно, я немного опоздаю с этим ответом, но я столкнулся с той же проблемой, и, как я решил, я добавил две дополнительные задачи в каждый тег. "Предыдущий" в начале и "Завершить" в конце. Предыдущая задача - это внешний датчик задачи, который контролирует предыдущую работу. Complete - просто фиктивный оператор. Допустим, он запускается каждые 30 минут, поэтому даг будет выглядеть так:
dag = DAG(dag_id='TEST_DAG', default_args=default_args, schedule_interval=timedelta(minutes=30))
PREVIOUS = ExternalTaskSensor(
task_id='Previous_Run',
external_dag_id='TEST_DAG',
external_task_id='All_Tasks_Completed',
allowed_states=['success'],
execution_delta=timedelta(minutes=30),
dag=DAG
)
T1 = BashOperator(
task_id='TASK_01',
bash_command='echo "Hello World from Task 1"',
dag=dag
)
COMPLETE = DummyOperator(
task_id='All_Tasks_Completed',
dag=DAG
)
PREVIOUS >> T1 >> COMPLETE
Таким образом, следующий шаг, даже если он попадет в очередь, не позволит запускать задачи до тех пор, пока не завершится PREVIOUS.
Ответ 2
если вы хотите просто запустить один экземпляр за раз, затем попробуйте установить
max_active_runs = 1
Ответ 3
Используйте depends_on_past
и wait_for_downstream
в своей DAG.
Изменить: при тестировании два приведенных выше аргумента фактически не ожидают завершения всего прогона. Для этого нам пришлось написать настраиваемый датчик.
Ответ 4
В итоге у меня получилось сочетание
- Добавление зависимостей задачи: wait_for_downstream = True, depen_on_past = True
- Добавление max_active_runs: 1 к при создании метки. Я попытался добавить max_active_runs в качестве аргумента по умолчанию, но это не сработало.