Удаление журналов задач воздушного потока
Я запускаю 5 DAG, которые в течение месяца генерировали в общей сложности около 6 ГБ данных журнала в base_log_folder
. Я просто добавил remote_base_log_folder
, но, похоже, он не исключает запись в base_log_folder
.
Есть ли способ автоматически удалить старые файлы журналов, повернуть их или заставить воздушный поток не регистрироваться на диске (base_log_folder) только в удаленном хранилище?
Ответы
Ответ 1
Пожалуйста, обратитесь https://github.com/teamclairvoyant/airflow-maintenance-dags
В этом плагине есть группы DAG, которые могут убить остановленные задачи и очистить журнал.
Вы можете воспользоваться концепциями и придумать новую DAG, которая может выполнить очистку согласно вашему требованию.
Ответ 2
Мы удаляем журналы задач, реализуя наш собственный FileTaskHandler
, а затем указывая на него в airflow.cfg
. Таким образом, мы перезаписываем LogHandler по умолчанию, чтобы сохранить только N журналов задач, без планирования дополнительных групп доступности баз данных.
Мы используем Airflow==1.10.1
.
[core]
logging_config_class = log_config.LOGGING_CONFIG
log_config.LOGGING_CONFIG
BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
FOLDER_TASK_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}'
FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
LOGGING_CONFIG = {
'formatters': {},
'handlers': {
'...': {},
'task': {
'class': 'file_task_handler.FileTaskRotationHandler',
'formatter': 'airflow.job',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
'folder_task_template': FOLDER_TASK_TEMPLATE,
'retention': 20
},
'...': {}
},
'loggers': {
'airflow.task': {
'handlers': ['task'],
'level': JOB_LOG_LEVEL,
'propagate': False,
},
'airflow.task_runner': {
'handlers': ['task'],
'level': LOG_LEVEL,
'propagate': True,
},
'...': {}
}
}
file_task_handler.FileTaskRotationHandler
import os
import shutil
from airflow.utils.helpers import parse_template_string
from airflow.utils.log.file_task_handler import FileTaskHandler
class FileTaskRotationHandler(FileTaskHandler):
def __init__(self, base_log_folder, filename_template, folder_task_template, retention):
"""
:param base_log_folder: Base log folder to place logs.
:param filename_template: template filename string.
:param folder_task_template: template folder task path.
:param retention: Number of folder logs to keep
"""
super(FileTaskRotationHandler, self).__init__(base_log_folder, filename_template)
self.retention = retention
self.folder_task_template, self.folder_task_template_jinja_template = \
parse_template_string(folder_task_template)
@staticmethod
def _get_directories(path='.'):
return next(os.walk(path))[1]
def _render_folder_task_path(self, ti):
if self.folder_task_template_jinja_template:
jinja_context = ti.get_template_context()
return self.folder_task_template_jinja_template.render(**jinja_context)
return self.folder_task_template.format(dag_id=ti.dag_id, task_id=ti.task_id)
def _init_file(self, ti):
relative_path = self._render_folder_task_path(ti)
folder_task_path = os.path.join(self.local_base, relative_path)
subfolders = self._get_directories(folder_task_path)
to_remove = set(subfolders) - set(subfolders[-self.retention:])
for dir_to_remove in to_remove:
full_dir_to_remove = os.path.join(folder_task_path, dir_to_remove)
print('Removing', full_dir_to_remove)
shutil.rmtree(full_dir_to_remove)
return FileTaskHandler._init_file(self, ti)
Ответ 3
Сторонники Airflow не считают, что усекающиеся журналы являются частью логики ядра воздушного потока, чтобы увидеть это, а затем в этом выпуске, сопровождающие предложить изменить LOG_LEVEL, чтобы избежать слишком большого количества данных журнала.
И в этот PR, мы можем узнать, как изменить уровень журнала в airflow.cfg
.
удачи.
Ответ 4
Я не думаю, что есть механизм вращения, но вы можете хранить их в S3 или облачном хранилище Google, как описано здесь: https://airflow.incubator.apache.org/configuration.html#logs
Ответ 5
Ответ Франци на Airflow 1.10 правильный, я просто не могу добавлять комментарии.
Одно предостережение: из-за того, как взаимодействуют ведение журнала, многопроцессорная обработка и обработчики по умолчанию Airflow, безопаснее переопределять методы обработчика, чем расширять их, вызывая super() в производном классе обработчика. Потому что обработчики Airflow по умолчанию не используют блокировки