Как создать условную задачу в Airflow

Я хотел бы создать условную задачу в Airflow, как описано в приведенной ниже схеме. Ожидаемый сценарий следующий:

  • Задание 1 выполняется
  • Если задача 1 выполнена успешно, выполните задачу 2a
  • Else Если задача 1 завершается с ошибкой, выполните задачу 2b
  • Наконец выполнить задачу 3

Условная задача Все вышеперечисленные задачи - SSHExecuteOperator. Я предполагаю, что я должен использовать ShortCircuitOperator и/или XCom для управления этим условием, но я не понимаю, как это реализовать. Не могли бы вы описать решение?

Ответы

Ответ 1

Вы должны использовать правила триггера воздушного потока

Все операторы имеют аргумент trigger_rule, который определяет правило, с помощью которого генерируется сгенерированная задача.

Возможны триггерные возможности:

ALL_SUCCESS = 'all_success'
ALL_FAILED = 'all_failed'
ALL_DONE = 'all_done'
ONE_SUCCESS = 'one_success'
ONE_FAILED = 'one_failed'
DUMMY = 'dummy'

Вот вам идея решить вашу проблему:

from airflow.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.hooks import SSHHook

sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)

task_1 = SSHExecuteOperator(
        task_id='task_1',
        bash_command=<YOUR COMMAND>,
        ssh_hook=sshHook,
        dag=dag)

task_2 = SSHExecuteOperator(
        task_id='conditional_task',
        bash_command=<YOUR COMMAND>,
        ssh_hook=sshHook,
        dag=dag)

task_2a = SSHExecuteOperator(
        task_id='task_2a',
        bash_command=<YOUR COMMAND>,
        trigger_rule=TriggerRule.ALL_SUCCESS,
        ssh_hook=sshHook,
        dag=dag)

task_2b = SSHExecuteOperator(
        task_id='task_2b',
        bash_command=<YOUR COMMAND>,
        trigger_rule=TriggerRule.ALL_FAILED,
        ssh_hook=sshHook,
        dag=dag)

task_3 = SSHExecuteOperator(
        task_id='task_3',
        bash_command=<YOUR COMMAND>,
        trigger_rule=TriggerRule.ONE_SUCCESS,
        ssh_hook=sshHook,
        dag=dag)


task_2.set_upstream(task_1)
task_2a.set_upstream(task_2)
task_2b.set_upstream(task_2)
task_3.set_upstream(task_2a)
task_3.set_upstream(task_2b)

Ответ 2

Airflow имеет BranchPythonOperator, который может использоваться для более прямого выражения зависимости от ветвления.

docs описывают его использование:

BranchPythonOperator очень похож на PythonOperator, за исключением того, что он ожидает, что python_callable возвращает task_id. Возвращается task_id, и все остальные пути пропускаются. Функция task_id, возвращаемая функцией Python, должна ссылаться на задачу непосредственно по течению от задачи BranchPythonOperator.

...

Если вы хотите пропустить некоторые задачи, имейте в виду, что у вас нет пустого пути, если это так, создайте фиктивную задачу.

введите описание изображения здесь