Airflow - файл Python NOT в одной папке DAG
Я пытаюсь использовать Airflow для выполнения простого python задачи.
from __future__ import print_function
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
from pprint import pprint
seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
datetime.min.time())
args = {
'owner': 'airflow',
'start_date': seven_days_ago,
}
dag = DAG(dag_id='python_test', default_args=args)
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print',
provide_context=True,
python_callable=print_context,
dag=dag)
Если я попробую, например:
тест воздушного потока python_test print 2015-01-01
Это работает!
Теперь я хочу поместить мою функцию def print_context(ds, **kwargs)
в другой файл python. Поэтому я создаю файл antoher с именем: simple_test.py и изменяю:
run_this = PythonOperator(
task_id='print',
provide_context=True,
python_callable=simple_test.print_context,
dag=dag)
Теперь я снова пытаюсь запустить:
тест воздушного потока python_test print 2015-01-01
И ОК! Он по-прежнему работает!
Но если я создаю модуль, например, рабочий модуль с файлом SimplePython.py
, импортируйте (from worker import SimplePython
) его и попробуйте:
тест воздушного потока python_test print 2015-01-01
Он выдает сообщение:
ImportError: нет модуля с именем worker
Вопросы:
- Можно ли импортировать модуль внутри определения DAG?
- Как Airflow + Celery собирается распространять все необходимые файлы источников python на рабочих узлах?
Ответы
Ответ 1
Вы можете упаковать зависимости вашей группы доступности базы данных в соответствии с:
https://airflow.apache.org/concepts.html#packaged-dags
Чтобы разрешить это, вы можете создать zip файл, который содержит dag в корне zip файла и распаковать дополнительные модули в каталогах. Например, вы можете создать ZIP файл, который выглядит следующим образом:
my_dag1.py
my_dag2.py
package1/__init__.py
package1/functions.py
Airflow просканирует zip файл и попытается загрузить my_dag1.py и my_dag2.py. Он не будет входить в подкаталоги, поскольку они считаются потенциальными пакетами.
При использовании CeleryExecutor необходимо вручную синхронизировать каталоги DAG, Airflow не позаботится об этом за вас:
https://airflow.apache.org/configuration.html?highlight=scaling%20out%20celery#scaling-out-with-celery
Рабочий должен иметь доступ к своему DAGS_FOLDER, и вам нужно синхронизировать файловые системы своими собственными средствами
Ответ 2
В то время как упаковка ваших пакетов в zip, как описано в документации, является единственным поддерживаемым решением, которое я видел, вы также можете импортировать модули, которые находятся в папке dags. Это полезно, если вы автоматически синхронизируете папку dags с помощью других инструментов, таких как puppet & мерзавец.
Мне не ясна структура вашего каталога из вопроса, поэтому вот пример папки dags на основе типичной структуры проекта python:
└── airflow/dags # root airflow dags folder where all dags live
└── my_dags # git repo project root
├── my_dags # python src root (usually named same as project)
│ ├── my_test_globals.py # file I want to import
│ ├── dag_in_package.py
│ └── dags
│ └── dag_in_subpackage.py
├── README.md # also setup.py, LICENSE, etc here
└── dag_in_project_root.py
Я пропустил (обязательные [1]) __init__.py
файлы. Обратите внимание на расположение трех примеров дагов. Вы почти наверняка использовали бы только одно из этих мест для всех своих дагов. Я привожу их все сюда для примера, потому что это не должно иметь значения для импорта. Чтобы импортировать my_test_globals
из любого из них:
from my_dags.my_dags import my_test_globals
Я полагаю, что это означает, что поток воздуха запускается с путем python, указанным в каталоге dags, поэтому каждый подкаталог в папке dags может рассматриваться как пакет python. В моем случае это был дополнительный промежуточный корневой каталог проекта, который мешал выполнять типичный абсолютный импорт внутри пакета. Таким образом, мы могли бы реструктурировать этот проект воздушного потока следующим образом:
└── airflow/dags # root airflow dags folder where all dags live
└── my_dags # git repo project root & python src root
├── my_test_globals.py # file I want to import
├── dag_in_package.py
├── dags
│ └── dag_in_subpackage.py
├── README.md # also setup.py, LICENSE, etc here
└── dag_in_project_root.py
Чтобы импорт выглядел так, как мы ожидаем:
from my_dags import my_test_globals
Ответ 3
Для вашего первого вопроса это возможно.
И я думаю, вы должны создать пустой файл с именем __init__.py
в том же каталоге с SimplePython.py
(это ваш каталог worker
в вашем случае). При этом каталог worker
будет рассматриваться как модуль python.
Затем в определении DAG попробуйте from worker.SimplePython import print_context
.
В вашем случае, я думаю, было бы лучше, если бы вы создали плагин для воздушного потока, потому что вам может понадобиться обновить проект ядра воздушного потока, не снимая ваши настраиваемые функции.
Ответ 4
Для вашего второго вопроса: как Airflow + Celery собирается распространять все необходимые файлы источников python на рабочих узлах?
Из документации: Работник должен иметь доступ к своему DAGS_FOLDER, и вам нужно синхронизировать файловые системы по своему усмотрению. Обычной установкой было бы сохранить ваш DAGS_FOLDER в репозитории Git и синхронизировать его на машинах с помощью Chef, Puppet, Ansible или любого другого, что вы используете для настройки компьютеров в вашей среде. Если у всех ваших ящиков есть общая точка монтирования, ваши файлы с конвейерами должны работать вместе.
http://pythonhosted.org/airflow/installation.html?highlight=chef