Невозможно импортировать плагины Airflow
После Airflow учебник здесь.
Проблема: веб-сервер возвращает следующую ошибку
Broken DAG: [/usr/local/airflow/dags/test_operator.py] cannot import name
MyFirstOperator
Примечания. Структура каталогов выглядит так:
airflow_home
├── airflow.cfg
├── airflow.db
├── dags
│ └── test_operators.py
├── plugins
│ └── my_operators.py
└── unittests.cfg
Я пытаюсь импортировать плагин в 'test_operators.py' следующим образом:
from airflow.operators import MyFirstOperator
Код все тот же, что и в учебнике.
Ответы
Ответ 1
В статье он делает следующее:
class MyFirstPlugin(AirflowPlugin):
name = "my_first_plugin"
operators = [MyFirstOperator]
Вместо этого используйте:
class MyFirstPlugin(AirflowPlugin):
name = "my_first_plugin"
operators = [MyFirstOperator]
# A list of class(es) derived from BaseHook
hooks = []
# A list of class(es) derived from BaseExecutor
executors = []
# A list of references to inject into the macros namespace
macros = []
# A list of objects created from a class derived
# from flask_admin.BaseView
admin_views = []
# A list of Blueprint object created from flask.Blueprint
flask_blueprints = []
# A list of menu links (flask_admin.base.MenuLink)
menu_links = []
Также не используйте:
from airflow.operators import MyFirstOperator
Согласно статье воздушного потока о плагинах, она должна быть:
from airflow.operators.my_first_plugin import MyFirstOperator
Если это не работает, попробуйте:
from airflow.operators.my_operators import MyFirstOperator
Если это не сработает, обратитесь за дополнительной информацией к журналу веб-сервера при запуске.
Ответ 2
Я перезапустил веб-сервер, и теперь все работает нормально.
Вот что, я думаю, могло случиться:
- Прежде чем начать с примера, я попытался запустить собственный плагин и dag. Была небольшая синтаксическая ошибка в первом запуске, который я исправил, однако после исправления я начал получать ошибку "не импортировать имя".
- Я удалил плагин и dag и попытался использовать ту, что из учебника, чтобы посмотреть, что происходит.
Я предполагаю, что ошибка с шага 1 каким-то образом повлияла на шаг 2.
Ответ 3
Я использую воздушный поток 1.10. Если это пользовательский оператор, который вы хотите импортировать, вы можете загрузить его в папку плагинов airflow, а затем в DAG указать импорт как:
из [имя файла] импорт [имя класса]
где: имя файла - это имя вашего файла плагина. имя класса - это имя вашего класса.
Например: если имя вашего файла - my_first_plugin, а имя класса - MyFirstOperator, то импорт будет:
из my_first_plugin импортировать MyFirstOperator
Работал для меня, как я использую воздушный поток 1.10
Спасибо ! Надеюсь это поможет !!
Ответ 4
Я столкнулся с той же ошибкой, следуя этим урокам.
Однако моя ошибка заключалась в том, что я использовал пробел ' '
в task_id
, который не поддерживается Airflow
.
Ясно, что ошибка не указывает на реальную проблему. Перезапуск scheduler
Airflow и webserver
показал правильное сообщение об ошибке в WebUI.
Ответ 5
Мне пришлось обновить путь к плагину в файле airflow.cfg
, чтобы решить проблему.
Где хранятся ваши плагины Airflow:
plugins_folder = /airflow/plugins
Ответ 6
Согласно документам -
Модули python в папке плагинов импортируются, а хуки, операторы, датчики, макросы, исполнители и веб-представления интегрируются в основные коллекции Airflows и становятся доступными для использования.
и отлично работает в версии 1.10.1
Ответ 7
В моем случае мне удалось сделать пользовательский оператор со следующими шагами:
- Воздушный поток 10,3
- в файле DAG
from airflow.operators import MacrosPostgresOperator
- В папке ~/airflow/plugins у меня есть файл python
custom_operator.py
и код довольно прост
from airflow.plugins_manager import AirflowPlugin
from airflow.operators.postgres_operator import PostgresOperator
class MacrosPostgresOperator(PostgresOperator):
template_fields = ('sql', 'parameters')
class MacrosFirstPlugin(AirflowPlugin):
name = "macros_first_plugin"
operators = [MacrosPostgresOperator]
Ответ 8
Вы должны остановить (CTRL-C) и перезапустить веб-сервер и планировщик Airflow.