Воздушный поток: как удалить DAG?
Я запустил веб-сервер Airflow и заплатил несколько промахов. Я могу видеть dags в веб-графическом интерфейсе.
Как я могу удалить определенную группу DAG для запуска и отобразить в веб-графическом интерфейсе? Есть ли команда CLI Airflow для этого?
Я огляделся, но не смог найти ответ на простой способ удаления DAG после его загрузки и планирования.
Ответы
Ответ 1
Редактировать 27/8/18 - Airflow 1.10 теперь выпущен на PyPI!
https://pypi.org/project/apache-airflow/1.10.0/
Как полностью удалить DAG
У нас есть эта функция сейчас в Airflow ≥ 1.10!
PR № 2199 (Jira: AIRFLOW-1002), добавляющий удаление DAG в Airflow, теперь объединен, что позволяет полностью удалить записи DAG из всех связанных таблиц.
Базовый код delete_dag (...) теперь является частью экспериментального API, и есть точки входа, доступные через CLI, а также через REST API.
CLI:
airflow delete_dag my_dag_id
REST API (локально работающий веб-сервер):
curl -X "DELETE" http://127.0.0.1:8080/api/experimental/dags/my_dag_id
Предупреждение относительно REST API: убедитесь, что ваш кластер Airflow использует аутентификацию на производстве.
Установка/обновление до Airflow 1.10 (актуально)
Для обновления запустите:
export SLUGIFY_USES_TEXT_UNIDECODE=yes
или же:
export AIRFLOW_GPL_UNIDECODE=yes
Затем:
pip install -U apache-airflow
Не забудьте сначала проверить UPDATING.md на полную информацию!
Ответ 2
Это мой адаптированный код, используя PostgresHook с параметром connection_id по умолчанию.
import sys
from airflow.hooks.postgres_hook import PostgresHook
dag_input = sys.argv[1]
hook=PostgresHook( postgres_conn_id= "airflow_db")
for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]:
sql="delete from {} where dag_id='{}'".format(t, dag_input)
hook.run(sql, True)
Ответ 3
Не уверен, почему Apache Airflow не имеет очевидного и простого способа удалить DAG
Filed https://issues.apache.org/jira/browse/AIRFLOW-1002
Ответ 4
Я просто написал script, который удаляет все, что связано с определенным dag, но это только для MySQL. Вы можете написать другой метод соединения, если используете PostgreSQL. Первоначально команды, которые размещены Лэнсом на https://groups.google.com/forum/#!topic/airbnb_airflow/GVsNsUxPRC0
Я просто положил его в script. Надеюсь это поможет. Формат: python script.py dag_id
import sys
import MySQLdb
dag_input = sys.argv[1]
query = {'delete from xcom where dag_id = "' + dag_input + '"',
'delete from task_instance where dag_id = "' + dag_input + '"',
'delete from sla_miss where dag_id = "' + dag_input + '"',
'delete from log where dag_id = "' + dag_input + '"',
'delete from job where dag_id = "' + dag_input + '"',
'delete from dag_run where dag_id = "' + dag_input + '"',
'delete from dag where dag_id = "' + dag_input + '"' }
def connect(query):
db = MySQLdb.connect(host="hostname", user="username", passwd="password", db="database")
cur = db.cursor()
cur.execute(query)
db.commit()
db.close()
return
for value in query:
print value
connect(value)
Ответ 5
DAG-ы могут быть удалены в Airflow 1.10, но процесс и последовательность действий должны быть правильными.
Возникает "проблема с яйцом и курицей" - если вы удалите DAG из внешнего интерфейса, пока файл еще там, DAG будет перезагружен (так как файл не удален). Если вы сначала удалите файл и обновите страницу, DAG больше не удалится из веб-интерфейса.
Итак, последовательность действий, которая позволила мне удалить DAG из внешнего интерфейса, была такой:
- удалить файл DAG (в моем случае удалить из репозитория конвейера и развернуть на серверах воздушного потока, особенно в планировщике)
- НЕ обновляйте веб-интерфейс.
- В веб-интерфейсе пользователя в представлении DAG (обычная главная страница) нажмите "Удалить метку" →
красный значок справа внизу.
- Он очищает все остатки этого DAG из базы данных.
Ответ 6
Я написал script, который удаляет все метаданные, относящиеся к определенному дагу для SQLite по умолчанию SQLite. Это основано на ответе Иисуса выше, но адаптировано из Postgres для SQLite. Пользователи должны устанавливать ../airflow.db
везде, где script.py хранится относительно файла airflow.db по умолчанию (обычно ~/airflow
). Для выполнения используйте python script.py dag_id
.
import sqlite3
import sys
conn = sqlite3.connect('../airflow.db')
c = conn.cursor()
dag_input = sys.argv[1]
for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]:
query = "delete from {} where dag_id='{}'".format(t, dag_input)
c.execute(query)
conn.commit()
conn.close()
Ответ 7
Воздушный поток 1.10.1 был выпущен. В этом выпуске добавлена возможность удаления DAG из веб-интерфейса после удаления соответствующей DAG из файловой системы.
Смотрите этот билет для более подробной информации:
[AIRFLOW-2657] Добавлена возможность удаления DAG из веб-интерфейса.
![Airflow Links menu with delete icon]()
Обратите внимание, что это на самом деле не удаляет группу обеспечения доступности баз данных из файловой системы, сначала вам нужно будет сделать это вручную, в противном случае группа доступности базы данных будет перезагружена.
Ответ 8
В Airflow нет ничего встроенного, который сделает это за вас. Чтобы удалить DAG, удалите его из репозитория и удалите записи базы данных в таблице метастатистики Airflow - dag.
Ответ 9
Вы можете очистить набор экземпляров задачи, как если бы они никогда не запускались с помощью:
airflow clear dag_id -s 2017-1-23 -e 2017-8-31
И затем удалите файл dag из папки dags
Ответ 10
версии> = 1.10.0:
У меня версия 1.10.2 airflow, и я попытался выполнить команду delete_dag airflow, но команда выдает следующую ошибку:
bash-4.2 # airflow delete_dag dag_id
[2019-03-16 15: 37: 20,804] {settings.py:174} INFO - settings.configure_orm(): использование настроек пула. pool_size = 5, pool_recycle = 1800, pid = 28224
/usr/lib64/python2.7/site-packages/psycopg2/init.py: 144: предупреждение пользователя: пакет колеса psycopg2 будет переименован из выпуска 2.8; чтобы продолжить установку из бинарного файла, используйте вместо этого "pip install psycopg2-binary". Подробнее см.: http://initd.org/psycopg/docs/install.html#binary-install-from-pypi. "" ")
Это приведет к удалению всех существующих записей, связанных с указанным DAG. Продолжить? (Г/л) у
Traceback (последний вызов был последним): Файл "/usr/bin/airflow", строка 32, в args.func (арг) Файл "/usr/lib/python2.7/site-packages/airflow/utils/cli.py", строка 74, в оболочке вернуть f (* args, ** kwargs) Файл "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", строка 258, в delete_dag повысить AirflowException (ошибка)
airflow.exceptions.AirflowException: ошибка сервера
Хотя я могу удалить через команду Curl.
Пожалуйста, дайте мне знать, если у кого-то есть представление об исполнении этой команды, это известно или я что-то не так делаю.
версии & lt; = 1.9.0:
Команды для удаления dag нет, поэтому сначала необходимо удалить файл dag, а затем удалить все ссылки на dag_id из базы данных метаданных воздушного потока.
ПРЕДУПРЕЖДЕНИЕ
Вы можете сбросить базу данных метаданных воздушного потока, вы удалите все, включая теги, но помните, что вы также удалите историю, пулы, переменные и т.д.
airflow resetdb
, а затем airflow initdb
Ответ 11
Основываясь на ответе @OlegYamin, я делаю следующее, чтобы удалить метку, поддерживаемую postgres, где airflow использует public
схему.
delete from public.dag_pickle where id = (
select pickle_id from public.dag where dag_id = 'my_dag_id'
);
delete from public.dag_run where dag_id = 'my_dag_id';
delete from public.dag_stats where dag_id = 'my_dag_id';
delete from public.log where dag_id = 'my_dag_id';
delete from public.sla_miss where dag_id = 'my_dag_id';
delete from public.task_fail where dag_id = 'my_dag_id';
delete from public.task_instance where dag_id = 'my_dag_id';
delete from public.xcom where dag_id = 'my_dag_id';
delete from public.dag where dag_id = 'my_dag_id';
ВНИМАНИЕ: Эффект/правильность первого запроса на удаление мне неизвестна. Это просто предположение, что это необходимо.
Ответ 12
просто удалите его из MySQL, отлично работает для меня. удалите их из таблиц ниже:
-
даг
-
dag_constructor
- dag_group_ship
- dag_pickle
- dag_run
- dag_stats
(в будущем выпуске может быть больше таблиц), затем перезапустите веб-сервер и работника.
Ответ 13
Удалите dag (вы хотите удалить) из папки dags и запустите airflow resetdb
.
Кроме того, вы можете войти в airflow_db и вручную удалить эти записи из таблиц dag (task_fail, xcom, task_instance, sla_miss, log, job, dag_run, dag, dag_stats).
Ответ 14
Для тех, кто все еще находит ответы. В версии Airflow версии 1.8 очень сложно удалить DAG, вы можете обратиться к ответам выше. Но начиная с версии 1.9, вам просто нужно
удалить dag в папке dags и перезапустить веб-сервер