Настройка s3 для журналов в воздушном потоке
Я использую docker-compose для создания масштабируемого кластера воздушного потока. Я основывал свой подход на этом Dockerfile https://hub.docker.com/r/puckel/docker-airflow/
Моя проблема заключается в том, что журналы созданы для записи/чтения из s3. Когда завершение dag завершено, я получаю ошибку, подобную этой
*** Log file isn't local.
*** Fetching here: http://ea43d4d49f35:8793/log/xxxxxxx/2017-06-26T11:00:00
*** Failed to fetch log file from worker.
*** Reading remote logs...
Could not read logs from s3://buckets/xxxxxxx/airflow/logs/xxxxxxx/2017-06-
26T11:00:00
Я установил новый раздел в файле airflow.cfg
, подобный этому
[MyS3Conn]
aws_access_key_id = xxxxxxx
aws_secret_access_key = xxxxxxx
aws_default_region = xxxxxxx
Затем укажите путь s3 в разделе удаленных журналов в airflow.cfg
remote_base_log_folder = s3://buckets/xxxx/airflow/logs
remote_log_conn_id = MyS3Conn
Я правильно настроил это, и есть ошибка? Есть ли рецепт успеха здесь, который мне не хватает?
- Обновить
Я попробовал экспортировать в форматах URI и JSON и, похоже, не работал. Затем я экспортировал aws_access_key_id и aws_secret_access_key, а затем поток воздуха начал его собирать. Теперь я получаю его ошибку в рабочих журналах
6/30/2017 6:05:59 PMINFO:root:Using connection to: s3
6/30/2017 6:06:00 PMERROR:root:Could not read logs from s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMERROR:root:Could not write logs to s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMLogging into: /usr/local/airflow/logs/xxxxx/2017-06-30T23:45:00
- Обновить
Я тоже нашел эту ссылку
https://www.mail-archive.com/[email protected]/msg00462.html
Затем я обрушился на одну из моих рабочих машин (отдельно от веб-сервера и планировщика) и запустил этот бит кода в python
import airflow
s3 = airflow.hooks.S3Hook('s3_conn')
s3.load_string('test', airflow.conf.get('core', 'remote_base_log_folder'))
Я получаю эту ошибку.
boto.exception.S3ResponseError: S3ResponseError: 403 Forbidden
Я попытался экспортировать несколько различных типов AIRFLOW_CONN_
envs, как описано здесь, в разделе соединений https://airflow.incubator.apache.org/concepts.html и другими ответами на этот вопрос.
s3://<AWS_ACCESS_KEY_ID>:<AWS_SECRET_ACCESS_KEY>@S3
{"aws_account_id":"<xxxxx>","role_arn":"arn:aws:iam::<xxxx>:role/<xxxxx>"}
{"aws_access_key_id":"<xxxxx>","aws_secret_access_key":"<xxxxx>"}
Я также экспортировал AWS_ACCESS_KEY_ID и AWS_SECRET_ACCESS_KEY без успеха.
Эти учетные данные хранятся в базе данных, поэтому, как только я добавлю их в пользовательский интерфейс, они должны быть отобраны рабочими, но по какой-то причине они не могут писать/читать журналы.
Ответы
Ответ 1
Вам необходимо настроить соединение s3 через интерфейс пользовательского интерфейса. Для этого вам нужно перейти на вкладку Admin → Connections в интерфейсе воздушного потока и создать новую строку для вашего S3-соединения.
Пример конфигурации:
Идентификатор соединения: my_conn_S3
Тип соединения: S3
Дополнительно: { "aws_access_key_id": "your_aws_key_id", "aws_secret_access_key": "your_aws_secret_key" }
Ответ 2
ОБНОВЛЕНИЕ Airflow 1.10 делает регистрацию намного проще.
Для ведения журнала s3, установите хук соединения согласно ответу выше
а затем просто добавьте следующее в airflow.cfg
[core]
# Airflow can store logs remotely in AWS S3. Users must supply a remote
# location URL (starting with either 's3://...') and an Airflow connection
# id that provides access to the storage location.
remote_base_log_folder = s3://my-bucket/path/to/logs
remote_log_conn_id = MyS3Conn
# Use server-side encryption for logs stored in S3
encrypt_s3_logs = False
Для регистрации gcs,
-
Сначала установите пакет gcp_api, например, так: pip install apache-airflow [gcp_api].
-
Установите хук соединения согласно ответу выше
-
Добавьте следующее в airflow.cfg
[core]
# Airflow can store logs remotely in AWS S3. Users must supply a remote
# location URL (starting with either 's3://...') and an Airflow connection
# id that provides access to the storage location.
remote_logging = True
remote_base_log_folder = gs://my-bucket/path/to/logs
remote_log_conn_id = MyGCSConn
ПРИМЕЧАНИЕ. По состоянию на Airflow 1.9 удаленное ведение журнала значительно изменилось. Если вы используете 1.9, читайте дальше.
Ссылка здесь
Полные инструкции:
-
Создайте каталог для хранения конфигов и разместите его так, чтобы его можно было найти в PYTHONPATH. Одним из примеров является $ AIRFLOW_HOME/config
-
Создайте пустые файлы с именами $ AIRFLOW_HOME/config/log_config.py и $ AIRFLOW_HOME/config/__ init__.py
-
Скопируйте содержимое airflow/config_templates/airflow_local_settings.py в файл log_config.py, который был только что создан на шаге выше.
-
Настройте следующие части шаблона:
#Add this variable to the top of the file. Note the trailing slash.
S3_LOG_FOLDER = 's3://<bucket where logs should be persisted>/'
Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG
LOGGING_CONFIG = ...
Add a S3TaskHandler to the 'handlers' block of the LOGGING_CONFIG variable
's3.task': {
'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
's3_log_folder': S3_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
Update the airflow.task and airflow.task_runner blocks to be 's3.task' instead >of 'file.task'.
'loggers': {
'airflow.task': {
'handlers': ['s3.task'],
...
},
'airflow.task_runner': {
'handlers': ['s3.task'],
...
},
'airflow': {
'handlers': ['console'],
...
},
}
-
Убедитесь, что в воздушном потоке был определен соединительный крюк s3 согласно ответу выше. Хук должен иметь доступ на чтение и запись к корзине s3, определенной выше в S3_LOG_FOLDER.
-
Обновите $ AIRFLOW_HOME/airflow.cfg, чтобы он содержал:
task_log_reader = s3.task
logging_config_class = log_config.LOGGING_CONFIG
remote_log_conn_id = <name of the s3 platform hook>
-
Перезапустите веб-сервер и планировщик Airflow и запустите (или дождитесь) выполнение новой задачи.
-
Убедитесь, что журналы отображаются для вновь выполненных задач в определенной вами корзине.
-
Убедитесь, что средство просмотра хранилища s3 работает в пользовательском интерфейсе. Подтяните недавно выполненную задачу и убедитесь, что вы видите что-то вроде:
*** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log.
[2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532
[2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py']
[2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor
[2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py
Ответ 3
Вот решение, если вы не хотите использовать интерфейс администратора.
Мой процесс развертывания Dockerized, и я никогда не касаюсь интерфейса администратора. Мне также нравится задавать переменные среды, специфичные для Airflow, в скрипте bash, который переопределяет файл .cfg.
расход воздуха [s3]
Прежде всего, вам нужен установленный подпакет s3
для записи ваших журналов Airflow в S3. (boto3
отлично работает для заданий Python внутри ваших групп DAG, но S3Hook
зависит от подпакета s3.)
Еще одно замечание: conda install еще не справилась с этим, поэтому я должен выполнить pip install airflow[s3]
.
Переменные среды
В скрипте bash я установил эти core
переменные. Начиная с этих инструкций, но используя соглашение об именах AIRFLOW__{SECTION}__{KEY}
для переменных среды, я делаю:
export AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://bucket/key
export AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_uri
export AIRFLOW__CORE__ENCRYPT_S3_LOGS=False
S3 ID соединения
s3_uri
- это идентификатор соединения, который я создал. В Airflow это соответствует другой переменной среды, AIRFLOW_CONN_S3_URI
. Значение этого - ваш путь S3, который должен быть в форме URI. Это
s3://access_key:[email protected]/key
Сохраните это, однако, вы обрабатываете другие конфиденциальные переменные среды
С этой конфигурацией Airflow запишет ваши логи на S3. Они будут следовать по пути s3://bucket/key/dag/task_id
.
Ответ 4
Чтобы завершить ответ Арне с последними обновлениями Airflow, вам не нужно устанавливать значение task_log_reader
от значения по умолчанию: task
Как если бы вы следовали шаблону регистрации по умолчанию airflow/config_templates/airflow_local_settings.py, который вы можете видеть после этой фиксации (обратите внимание, что имя обработчика изменилось на 's3': {'task'...
вместо s3.task
), что значение на удаленная папка (REMOTE_BASE_LOG_FOLDER
) заменит обработчик на правильный:
REMOTE_LOGGING = conf.get('core', 'remote_logging')
if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb'])
elif REMOTE_LOGGING and ELASTICSEARCH_HOST:
DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['elasticsearch'])
Подробнее о том, как войти в систему/прочитать с S3: https://github.com/apache/incubator-airflow/blob/master/docs/howto/write-logs.rst#writing-logs-to-amazon-s3
Ответ 5
Просто примечание для тех, кто следит за очень полезными инструкциями в ответе выше: Если вы наткнулись на эту проблему: "ModuleNotFoundError: Нет модуля с именем" airflow.utils.log.logging_mixin.RedirectStdHandler "", на который ссылаются здесь (что происходит при использовании airflow 1.9), исправить это просто - используйте скорее этот базовый шаблон: https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/config_templates/airflow_local_settings.py (и следуйте всем остальным инструкциям в ответ выше)
Текущий шаблон инкубатора-airflow/airflow/config_templates/airflow_local_settings.py, присутствующий в основной ветке, содержит ссылку на класс "airflow.utils.log.s3_task_handler.S3TaskHandler", которого нет в apache-airflow == 1.9.0 python пакет. Надеюсь это поможет!
Ответ 6
Пусть он работает с Airflow 10 в кубе. У меня есть следующие наборы env var:
AIRFLOW_CONN_LOGS_S3=s3://id:[email protected]
AIRFLOW__CORE__REMOTE_LOGGING=True
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://xxxx/logs
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=logs_s3