Воздушный поток не загружается в /usr/local/airflow/dags

Воздушный поток, кажется, пропускает даги, которые я добавил в /usr/local/airflow/dags.

Когда я бегу

airflow list_dags

Вывод показывает

[2017-08-06 17:03:47,220] {models.py:168} INFO - Filling up the DagBag from /usr/local/airflow/dags


-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
example_bash_operator
example_branch_dop_operator_v3
example_branch_operator
example_http_operator
example_passing_params_via_test_command
example_python_operator
example_short_circuit_operator
example_skip_dag
example_subdag_operator
example_subdag_operator.section-1
example_subdag_operator.section-2
example_trigger_controller_dag
example_trigger_target_dag
example_xcom
latest_only
latest_only_with_trigger
test_utils
tutorial

Но это не включает в себя dags в /usr/local/airflow/dags

ls -la /usr/local/airflow/dags/
total 20
drwxr-xr-x 3 airflow airflow 4096 Aug  6 17:08 .
drwxr-xr-x 4 airflow airflow 4096 Aug  6 16:57 ..
-rw-r--r-- 1 airflow airflow 1645 Aug  6 17:03 custom_example_bash_operator.py
drwxr-xr-x 2 airflow airflow 4096 Aug  6 17:08 __pycache__

Есть ли еще какое-то другое условие, которое необходимо выполнить для воздушного потока, чтобы идентифицировать DAG и загрузить его?

Ответы

Ответ 1

Попробуйте инициализировать поток initdb перед тем, как вы укажете dags. Это связано с тем, что в списке airflow list_dags перечислены все дагсы, присутствующие в базе данных (а не в указанной вами папке). Airflow initdb создаст запись для этих даг в базе данных.

Убедитесь, что переменная среды AIRFLOW_HOME установлена в /usr/local/airflow. Если эта переменная не задана, воздушный поток ищет провалы в папке домашнего воздушного потока, которая может отсутствовать в вашем случае.

Ответ 2

Мой dag загружается, но у меня было неправильное имя DAG. Я ожидал, что dag будет назван файлом, но имя определяется первым аргументом конструктора DAG

dag = DAG(
    'tutorial', default_args=default_args, schedule_interval=timedelta(1))

Ответ 3

dag = DAG(
    dag_id='example_bash_operator', 
    default_args=args,
    schedule_interval='0 0 * * *',
    dagrun_timeout=timedelta(minutes=60))

Когда экземпляр DAG создается, он появляется по имени, указанному в атрибуте dag_id. dag_id служит уникальным идентификатором вашей DAG

Ответ 4

Файлы примеров не находятся в /usr/local/airflow/dags. Вы можете просто отключить их, отредактировав airflow.cfg (обычно в ~/airflow). установите load_examples = False в разделе "core".

Из-за нескольких ошибок ваша группа доступности базы данных не была указана в list_dags.

  1. Ваш файл DAG имеет синтаксическую проблему. Чтобы проверить это, просто запустите python custom_example_bash_operator.py и посмотрите, есть ли проблемы.
  2. Посмотрите, является ли папка стандартным путем загрузки dag. Для новой птицы я предлагаю просто создать новый файл .py и скопировать отсюда пример https://airflow.incubator.apache.org/tutorial.html, а затем посмотреть, появляется ли тестовая метка.
  3. Убедитесь, что в файле dag есть dag = DAG('dag_name', default_args=default_args).

Ответ 5

Ваш

custom_example_bash_operator.py

имеет ли название DAG, отличное от других? Если да, попробуйте перезапустить планировщик или даже resetdb. Я обычно ошибаюсь в имени файла, чтобы быть именем дага, так что лучше назвать их одинаковыми.

Ответ 6

Можете ли вы поделиться тем, что находится в custom_example_bash_operator.py? Airflow сканирует определенную магию внутри файла, чтобы определить, является ли DAG или нет. Он сканирует airflow и для DAG.

Кроме того, если вы используете дубликат dag_id для DAG, он будет перезаписан. Как вы, кажется, извлекаете из примера оператора bash, возможно, вы сохранили имя DAG example_bash_operator? Попробуйте переименовать это.

Ответ 7

Проверка. Как только путь к папке dags находится внутри airflow.cfg(dags_folder), он может быть неправильным.

Ответ 8

Я обнаружил, что мне нужно перезапустить планировщик, чтобы пользовательский интерфейс мог подобрать новые теги, когда я вносил изменения в тег в моей папке пакетов. Я обнаружил, что когда я обновляю ярлыки, они появляются в списке, когда я запускаю airflow list_dags, но не в пользовательском интерфейсе, пока я не перезапущу планировщик.

Сначала попробуйте запустить:

airflow scheduler

Ответ 9

Там может быть две проблемы: 1. Проверьте имя Dag, данное во время создания объекта DAG в программе Python DAG

dag = DAG(
dag_id='Name_Of_Your_DAG', 
....)

Обратите внимание, что во многих случаях имя может совпадать с уже присутствующим именем в списке групп обеспечения доступности баз данных (если вы скопировали код DAG). Если это не так, то 2. Проверьте путь к папке DAG в файле конфигурации Airflow. Вы можете создать файл DAG в любом месте вашей системы, но вам нужно указать путь к этой папке/каталогу DAG в файле конфигурации Airflow.

Например, я создал папку DAG в домашнем каталоге, затем мне нужно отредактировать файл airflow.cfg, используя следующие команды в терминале:

создание папки DAG дома или в корневом каталоге

$mkdir ~/DAG

Редактируемый файл airflow.cfg присутствует в каталоге воздушного потока, где я установил воздушный поток

 ~/$cd airflow
 ~/airflow$nano airflow.cfg

В этом файле измените путь dags_folder к папке DAG, которую мы создали.

Если проблема не устранена, переустановите Airflow и перейдите по этой ссылке для установки Apache Airflow.

Ответ 10

Попробуйте перезапустить планировщик. Планировщик необходимо перезапустить, когда новые DAGS необходимо добавить в сумку DAG