Воздушный поток не загружается в /usr/local/airflow/dags
Воздушный поток, кажется, пропускает даги, которые я добавил в /usr/local/airflow/dags.
Когда я бегу
airflow list_dags
Вывод показывает
[2017-08-06 17:03:47,220] {models.py:168} INFO - Filling up the DagBag from /usr/local/airflow/dags
-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
example_bash_operator
example_branch_dop_operator_v3
example_branch_operator
example_http_operator
example_passing_params_via_test_command
example_python_operator
example_short_circuit_operator
example_skip_dag
example_subdag_operator
example_subdag_operator.section-1
example_subdag_operator.section-2
example_trigger_controller_dag
example_trigger_target_dag
example_xcom
latest_only
latest_only_with_trigger
test_utils
tutorial
Но это не включает в себя dags в /usr/local/airflow/dags
ls -la /usr/local/airflow/dags/
total 20
drwxr-xr-x 3 airflow airflow 4096 Aug 6 17:08 .
drwxr-xr-x 4 airflow airflow 4096 Aug 6 16:57 ..
-rw-r--r-- 1 airflow airflow 1645 Aug 6 17:03 custom_example_bash_operator.py
drwxr-xr-x 2 airflow airflow 4096 Aug 6 17:08 __pycache__
Есть ли еще какое-то другое условие, которое необходимо выполнить для воздушного потока, чтобы идентифицировать DAG и загрузить его?
Ответы
Ответ 1
Попробуйте инициализировать поток initdb перед тем, как вы укажете dags. Это связано с тем, что в списке airflow list_dags перечислены все дагсы, присутствующие в базе данных (а не в указанной вами папке). Airflow initdb создаст запись для этих даг в базе данных.
Убедитесь, что переменная среды AIRFLOW_HOME установлена в /usr/local/airflow. Если эта переменная не задана, воздушный поток ищет провалы в папке домашнего воздушного потока, которая может отсутствовать в вашем случае.
Ответ 2
Мой dag загружается, но у меня было неправильное имя DAG. Я ожидал, что dag будет назван файлом, но имя определяется первым аргументом конструктора DAG
dag = DAG(
'tutorial', default_args=default_args, schedule_interval=timedelta(1))
Ответ 3
dag = DAG(
dag_id='example_bash_operator',
default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60))
Когда экземпляр DAG создается, он появляется по имени, указанному в атрибуте dag_id. dag_id служит уникальным идентификатором вашей DAG
Ответ 4
Файлы примеров не находятся в /usr/local/airflow/dags. Вы можете просто отключить их, отредактировав airflow.cfg (обычно в ~/airflow). установите load_examples = False
в разделе "core".
Из-за нескольких ошибок ваша группа доступности базы данных не была указана в list_dags
.
- Ваш файл DAG имеет синтаксическую проблему. Чтобы проверить это, просто запустите
python custom_example_bash_operator.py
и посмотрите, есть ли проблемы.
- Посмотрите, является ли папка стандартным путем загрузки dag. Для новой птицы я предлагаю просто создать новый файл .py и скопировать отсюда пример
https://airflow.incubator.apache.org/tutorial.html
, а затем посмотреть, появляется ли тестовая метка.
- Убедитесь, что в файле dag есть
dag = DAG('dag_name', default_args=default_args)
.
Ответ 5
Ваш
custom_example_bash_operator.py
имеет ли название DAG, отличное от других? Если да, попробуйте перезапустить планировщик или даже resetdb. Я обычно ошибаюсь в имени файла, чтобы быть именем дага, так что лучше назвать их одинаковыми.
Ответ 6
Можете ли вы поделиться тем, что находится в custom_example_bash_operator.py
? Airflow сканирует определенную магию внутри файла, чтобы определить, является ли DAG или нет. Он сканирует airflow
и для DAG
.
Кроме того, если вы используете дубликат dag_id для DAG, он будет перезаписан. Как вы, кажется, извлекаете из примера оператора bash, возможно, вы сохранили имя DAG example_bash_operator
? Попробуйте переименовать это.
Ответ 7
Проверка. Как только путь к папке dags находится внутри airflow.cfg(dags_folder), он может быть неправильным.
Ответ 8
Я обнаружил, что мне нужно перезапустить планировщик, чтобы пользовательский интерфейс мог подобрать новые теги, когда я вносил изменения в тег в моей папке пакетов. Я обнаружил, что когда я обновляю ярлыки, они появляются в списке, когда я запускаю airflow list_dags, но не в пользовательском интерфейсе, пока я не перезапущу планировщик.
Сначала попробуйте запустить:
airflow scheduler
Ответ 9
Там может быть две проблемы:
1. Проверьте имя Dag, данное во время создания объекта DAG в программе Python DAG
dag = DAG(
dag_id='Name_Of_Your_DAG',
....)
Обратите внимание, что во многих случаях имя может совпадать с уже присутствующим именем в списке групп обеспечения доступности баз данных (если вы скопировали код DAG). Если это не так, то
2. Проверьте путь к папке DAG в файле конфигурации Airflow.
Вы можете создать файл DAG в любом месте вашей системы, но вам нужно указать путь к этой папке/каталогу DAG в файле конфигурации Airflow.
Например, я создал папку DAG в домашнем каталоге, затем мне нужно отредактировать файл airflow.cfg, используя следующие команды в терминале:
создание папки DAG дома или в корневом каталоге
$mkdir ~/DAG
Редактируемый файл airflow.cfg присутствует в каталоге воздушного потока, где я установил воздушный поток
~/$cd airflow
~/airflow$nano airflow.cfg
В этом файле измените путь dags_folder к папке DAG, которую мы создали.
Если проблема не устранена, переустановите Airflow и перейдите по этой ссылке для установки Apache Airflow.
Ответ 10
Попробуйте перезапустить планировщик. Планировщик необходимо перезапустить, когда новые DAGS необходимо добавить в сумку DAG