Execute_date в воздушном потоке: нужно получить доступ как переменную
Я действительно новичок на этом форуме. Но я играю с воздушным потоком, когда-то, для нашей компании. Извините, если этот вопрос звучит очень глупо.
Я пишу конвейер, используя группу BashOperators.
В принципе, для каждой задачи я хочу просто вызвать REST api, используя "curl"
Вот как выглядит мой конвейер (очень упрощенная версия):
from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from dateutil import tz
import datetime
datetime_obj = datetime.datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': datetime.timedelta(minutes=5),
}
current_datetime = datetime_obj.now(tz=tz.tzlocal())
dag = DAG(
'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60))
curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+current_datetime +'"'
t1 = BashOperator(
task_id='rest-api-1',
bash_command=curl_cmd,
dag=dag)
Если вы заметили, что я делаю current_datetime= datetime_obj.now(tz=tz.tzlocal())
Вместо этого я хочу здесь 'execute_date'
Как использовать 'execute_date' и назначить его переменной в моем файле python?
У меня есть эта общая проблема доступа к args.
Любая помощь будет искренне оценена.
Спасибо
Ответы
Ответ 1
BashOperator
bash_command
является шаблоном. Вы можете получить доступ к execution_date
в любом шаблоне как datetime
и execution_date
datetime
объекта, используя execution_date
переменную. В шаблоне вы можете использовать любые методы jinja2
для манипулирования им.
Используя следующее в bash_command
строки BashOperator
bash_command
:
# pass in the first of the current month
some_command.sh {{ execution_date.replace(day=1) }}
# last day of previous month
some_command.sh {{ execution_date.replace(day=1) - macros.timedelta(days=1) }}
Если вы просто хотите получить строковый эквивалент даты выполнения, ds
вернет метку даты (ГГГГ-ММ-ДД), ds_nodash
вернет ее без черточек (ГГГГММДД) и т.д. Подробнее о macros
можно узнать из документации Api.
Ваш последний оператор будет выглядеть так:
command = """curl -XPOST '%(hostname)s:8000/run?st={{ ds }}'""" % locals()
t1 = BashOperator( task_id='rest-api-1', bash_command=command, dag=dag)
Ответ 2
Конструктор PythonOperator принимает параметр "обеспечить_контекст" (см. https://pythonhosted.org/airflow/code.html). Если он True, то он передает ряд параметров в python_callable через kwargs. Полагаю, что kwargs ['execution_date'] - это то, что вы хотите.
Что-то вроде этого:
def python_method(ds, **kwargs):
Variable.set('execution_date', kwargs['execution_date'])
return
doit = PythonOperator(
task_id='doit',
provide_context=True,
python_callable=python_method,
dag=dag)
Я не уверен, как это сделать с BashOperator, но вы можете начать с этой проблемы: https://github.com/airbnb/airflow/issues/775
Ответ 3
Я думаю, что вы не можете назначать переменные со значениями из контекста воздушного потока вне экземпляра задачи, они доступны только во время выполнения. В основном есть 2 различных шага, когда даг загружается и выполняется в потоке воздуха:
-
Сначала ваш файл dag интерпретируется и анализируется. Он должен работать и компилироваться, а определения задач должны быть правильными (без синтаксической ошибки или чего-либо еще). На этом этапе, если вы вызовете функцию для заполнения некоторых значений, эти функции не смогут получить доступ к контексту воздушного потока (например, к дате выполнения, даже больше, если вы выполняете некоторую обратную засыпку).
-
Второй шаг - выполнение Дага. Только на этом втором шаге доступны переменные, предоставляемые потоком воздуха (execution_date, ds, etc...
), Так как они связаны с выполнением dag.
Таким образом, вы не можете инициализировать глобальные переменные, используя контекст Airflow, однако, Airflow предоставляет вам несколько механизмов для достижения одного и того же эффекта:
-
Используя шаблон jinja в вашей команде (он может быть в строке в коде или в файле, оба будут обработаны). У вас есть список доступных шаблонов здесь: https://airflow.apache.org/macros.html#default-variables. Обратите внимание, что некоторые функции также доступны, особенно для вычисления дней дельта и форматирования даты.
-
Использование PythonOperator, в котором вы передаете контекст (с аргументом provide_context
). Это позволит вам получить доступ к одному и тому же шаблону с помощью синтаксиса kwargs['<variable_name']
. Если вам нужно, вы можете вернуть значение из PythonOperator, оно будет сохранено в переменной XCOM, которую вы сможете использовать позже в любом шаблоне. Доступ к переменным XCOM использует следующий синтаксис: https://airflow.apache.org/concepts.html#xcoms
-
Если вы напишите свой собственный оператор, вы сможете получить доступ к переменным воздушного потока в context
dict.
Ответ 4
def execute(self, context):
execution_date = context.get("execution_date")
Это должно быть внутри метода execute() оператора
Ответ 5
Выполнение_date, (datetime.datetime)
{{ execution_date }}
Ответ 6
Чтобы напечатать дату выполнения внутри вызываемой функции вашего PythonOperator
вы можете использовать следующее в PythonOperator
Airflow, а также можете добавить start_time
и end_time
следующим образом:
def python_func(**kwargs):
ts = kwargs["execution_date"]
end_time = str(ts)
start_time = str(ts.add(minutes=-30))
Я преобразовал значение datetime в строку, так как мне нужно передать его в запросе SQL. Мы можем использовать это иначе.
Ответ 7
Вы можете рассмотреть SimpleHttpOperator https://airflow.apache.org/_api/airflow/operators/http_operator/index.html#airflow.operators.http_operator.SimpleHttpOperator. Это так просто сделать http запрос. Вы можете передать execute_date с параметром конечной точки через шаблон.