Макросы в операторе Airflow Python
Могу ли я использовать макросы с PythonOperator? Я попытался выполнить следующие действия, но мне не удалось получить макросы:
dag = DAG(
'temp',
default_args=default_args,
description='temp dag',
schedule_interval=timedelta(days=1))
def temp_def(a, b, **kwargs):
print '{{ds}}'
print '{{execution_date}}'
print 'a=%s, b=%s, kwargs=%s' % (str(a), str(b), str(kwargs))
ds = '{{ ds }}'
mm = '{{ execution_date }}'
t1 = PythonOperator(
task_id='temp_task',
python_callable=temp_def,
op_args=[mm , ds],
provide_context=False,
dag=dag)
Ответы
Ответ 1
Макросы обрабатываются только для шаблонных полей. Чтобы заставить Jinja обрабатывать это поле, расширьте PythonOperator
своим.
class MyPythonOperator(PythonOperator):
template_fields = ('templates_dict','op_args')
Я добавил 'templates_dict'
в template_fields
, потому что сам PythonOperator
имеет это поле templated:
PythonOperator
Теперь вы должны иметь возможность использовать макрос в этом поле:
ds = '{{ ds }}'
mm = '{{ execution_date }}'
t1 = MyPythonOperator(
task_id='temp_task',
python_callable=temp_def,
op_args=[mm , ds],
provide_context=False,
dag=dag)
Ответ 2
По-моему, более подходящим для Airflow способом было бы использовать включенный PythonOperator и использовать параметр provide_context=True
как таковой.
t1 = MyPythonOperator(
task_id='temp_task',
python_callable=temp_def,
provide_context=True,
dag=dag)
Теперь у вас есть доступ ко всем макросам, метаданным воздушного потока и параметрам задачи в kwargs
вашего вызываемого
def temp_def(**kwargs):
print 'ds={}, execution_date={}'.format((str(kwargs['ds']), str(kwargs['execution_date']))
Если у вас есть определенный пользовательский params
, связанный с задачей, вы можете получить доступ к ним также через kwargs['params']