Динамические задачи воздушного потока во время выполнения
Другие вопросы о "динамических задачах", похоже, направлены на динамическое построение DAG по расписанию или времени разработки. Мне интересно динамически добавлять задачи в DAG во время выполнения.
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
dag = DAG('test_dag', description='a test',
schedule_interval='0 0 * * *',
start_date=datetime(2018, 1, 1),
catchup=False)
def make_tasks():
du1 = DummyOperator(task_id='dummy1', dag=dag)
du2 = DummyOperator(task_id='dummy2', dag=dag)
du3 = DummyOperator(task_id='dummy3', dag=dag)
du1 >> du2 >> du3
p = PythonOperator(
task_id='python_operator',
dag=dag,
python_callable=make_tasks)
Эта наивная реализация, похоже, не работает - фиктивные задачи никогда не появляются в пользовательском интерфейсе.
Каков правильный способ добавления новых операторов в DAG во время выполнения? Возможно ли это?
Ответы
Ответ 1
Это не возможно изменить DAG во время его выполнения (без большого количества работы).
Планировщик подхватывает dag = DAG(...
в цикле. Он будет содержать экземпляр задачи 'python_operator'
. Этот экземпляр задачи будет запланирован в ходе выполнения dag и выполнен работником или исполнителем. Поскольку модели DAG в базе данных Airflow обновляются только планировщиком, эти добавленные фиктивные задачи не будут сохранены в группе обеспечения доступности баз данных и не запланированы к запуску. Они будут забыты при выходе из рабочего. Если вы не скопируете весь код из планировщика относительно сохранения и обновления модели … Но это будет отменено в следующий раз, когда планировщик посещает файл DAG для анализа, что может происходить раз в минуту, раз в секунду или быстрее, в зависимости от количества других файлов DAG, которые нужно проанализировать.
Airflow на самом деле хочет, чтобы каждая группа DAG оставалась примерно одинаковой между выполнениями. Он также хочет постоянно перезагружать/анализировать файлы DAG. Таким образом, хотя вы могли бы создать файл DAG, который при каждом запуске определяет задачи динамически на основе некоторых внешних данных (предпочтительно кэшируемых в модуле файла или pyc, а не сетевого ввода-вывода, как поиск в БД, вы замедляете весь цикл планирования для всех групп обеспечения доступности баз данных) это не очень хороший план, поскольку ваш график и древовидная структура запутаются, и ваш поиск в планировщике будет более обременительным.
Вы можете заставить вызываемый запускать каждую задачу...
def make_tasks(context):
du1 = DummyOperator(task_id='dummy1', dag=dag)
du2 = DummyOperator(task_id='dummy2', dag=dag)
du3 = DummyOperator(task_id='dummy3', dag=dag)
du1.execute(context)
du2.execute(context)
du3.execute(context)
p = PythonOperator(
provides_context=true,
Но это последовательно, и вам нужно разобраться, как использовать python, чтобы сделать их параллельными (использовать фьючерсы?), И если возникнет исключение, вся задача не будет выполнена. Также он привязан к одному исполнителю или работнику, поэтому не использует распределение задач воздушного потока (kubernetes, mesos, celery).
Другой способ работы с этим - добавить фиксированное количество задач (максимальное число) и использовать вызываемые элементы для короткого замыкания ненужных задач или выдвигать аргументы с помощью xcom для каждой из них, изменяя их поведение во время выполнения. но не меняя DAG.
Ответ 2
Что касается образца кода, вы никогда не вызываете свою функцию, которая регистрирует ваши задачи в вашей группе доступности баз данных.
Чтобы иметь какие-то динамические задачи, у вас может быть один оператор, который делает что-то другое в зависимости от какого-либо состояния, или вы можете иметь несколько операторов, которые могут быть пропущены в зависимости от состояния с помощью ShortCircuitOperator.
Ответ 3
Я ценю всю работу, проделанную всеми здесь, так как у меня такая же задача по созданию динамически структурированных групп доступности баз данных. Я сделал достаточно ошибок, чтобы не использовать программное обеспечение против его дизайна. Если я не могу проверить весь прогон в пользовательском интерфейсе и увеличивать и уменьшать масштаб, в основном использую функции воздушного потока, которые являются основной причиной, по которой я все равно его использую. Я могу просто написать многопроцессорный код внутри функции и покончить с этим.
Все это говорит о том, что мое решение состоит в том, чтобы использовать диспетчер ресурсов, такой как блокировка перенаправления, и иметь группу обеспечения доступности баз данных, которая записывает в этот диспетчер ресурсов данные о том, что запускать, как запускать и т. Д.; и иметь другие группы обеспечения доступности баз данных или группы обеспечения доступности баз данных, которые запускаются через определенные промежутки времени, опрашивая менеджер ресурсов, блокируя их перед запуском и удаляя их по окончании. Таким образом, по крайней мере, я использую поток воздуха, как и ожидалось, хотя его характеристики не совсем соответствуют моим потребностям. Я разбил проблему на более определенные куски. Решения креативны, но они противоречат дизайну и не проверены разработчиками. В частности, говорят, чтобы иметь фиксированные структурированные рабочие процессы. Я не могу обойти код, который не проверен и не соответствует дизайну, если я не переписываю основной код воздушного потока и не проверяю себя. Я понимаю, что мое решение приносит сложности с блокировкой и всем этим, но, по крайней мере, я знаю границы этого.
Ответ 4
Я думаю, что этот пост может быть полезным: https://www.linkedin.com/pulse/dynamic-workflows-airflow-kyle-bridenstine/