Ответ 1
Трудно найти ссылки, но после копания я смог заставить его работать.
TL;DR
Создайте новое соединение со следующими атрибутами:
Идентификатор соединения: my_conn_S3
Тип подключения: S3
Дополнительно:
{"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}
Длинная версия, настройка интерфейса пользователя:
- В пользовательском интерфейсе Airflow откройте "Администратор" > "Подключения"
- Создайте новое соединение со следующими атрибутами: Id: my_conn_S3, Тип соединения: S3, Дополнительно: { "aws_access_key_id": "_ your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key _" }
- Оставьте все остальные поля (Host, Schema, Login) пустыми.
Чтобы использовать это соединение, ниже вы можете найти простой S3 Sensor Test. Идея этого теста состоит в том, чтобы настроить датчик, который следит за файлами в S3 (задача T1), и один раз после выполнения условия он запускает команду bash (задача T2).
Тестирование
- Перед запуском DAG убедитесь, что у вас есть ведро S3 с именем "S3-Bucket-To-Watch".
- Добавьте ниже s3_dag_test.py в папку dags для потока воздуха (~/airflow/dags)
- Запустите
airflow webserver
. - Перейдите в интерфейс Airflow (http://localhost:8383/)
- Запустите
airflow scheduler
. - Включите 's3_dag_test' DAG в главном представлении DAG.
- Выберите 's3_dag_test', чтобы показать детали dag.
- На графическом представлении вы должны увидеть текущее состояние. Задача
- 'check_s3_for_file_in_s3' должна быть активной и запущенной.
- Теперь добавьте файл с именем "file-to-watch-1" в "S3-Bucket-To-Watch".
- Первые задачи должны быть завершены, второй должен быть запущен и закончен.
Расписание_интервал в определении dag устанавливается в '@once', чтобы облегчить отладку.
Чтобы запустить его снова, оставьте все как есть, удалите файлы в ведро и повторите попытку, выбрав первую задачу (в виде графика) и выбрав "Очистить" все "Прошлое", "Будущее", "Восходящий поток", 'Downstream'.... активность. Это должно снова запустить DAG.
Сообщите мне, как это получилось.
s3_dag_test.py;
"""
S3 Sensor Connection Test
"""
from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, BashOperator, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 11, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('s3_dag_test', default_args=default_args, schedule_interval= '@once')
t1 = BashOperator(
task_id='bash_test',
bash_command='echo "hello, it should work" > s3_conn_test.txt',
dag=dag)
sensor = S3KeySensor(
task_id='check_s3_for_file_in_s3',
bucket_key='file-to-watch-*',
wildcard_match=True,
bucket_name='S3-Bucket-To-Watch',
s3_conn_id='my_conn_S3',
timeout=18*60*60,
poke_interval=120,
dag=dag)
t1.set_upstream(sensor)
Основные ссылки: