Запуск работы по воздушному потоку на веб-ресурсе
Я хотел знать, могут ли выполняться задачи воздушного потока при получении запроса через HTTP. Меня не интересует часть планирования Airflow. Я просто хочу использовать его в качестве заменителя сельдерея.
Итак, пример операции будет примерно таким.
- Пользователь отправляет форму, запрашивающую какой-либо отчет.
- Бэкэнд получает запрос и отправляет пользователю уведомление о том, что запрос был получен.
- Затем backend запускает работу, используя Airflow для немедленного запуска.
- Затем Airflow выполняет ряд задач, связанных с DAG. Например, сначала вытащите данные из красного смещения, вытащите данные из MySQL, сделайте некоторые операции над двумя наборами результатов, объедините их, а затем загрузите результаты в Amazon S3, отправьте электронное письмо.
Из всего, что я читаю в Интернете, вы можете запускать задания воздушного потока, выполнив airflow ...
в командной строке. Мне было интересно, есть ли api python, который может выполнять одно и то же.
Спасибо.
Ответы
Ответ 1
Airflow REST API Plugin поможет вам здесь. После того, как вы выполнили инструкции по установке плагина, вам просто нужно нажать следующий URL: http://{HOST}:{PORT}/admin/rest_api/api/v1.0/trigger_dag?dag_id={dag_id}&run_id={run_id}&conf={url_encoded_json_parameters}
, заменив dag_id идентификатором вашего dag, либо опустив run_id, либо указав уникальный идентификатор, и передав json для кодирования url для conf (с любым из параметров, которые вам нужны в запущенном dag).
Вот пример функции JavaScript, которая использует jQuery для вызова Airflow api:
function triggerDag(dagId, dagParameters){
var urlEncodedParameters = encodeURIComponent(dagParameters);
var dagRunUrl = "http://airflow:8080/admin/rest_api/api/v1.0/trigger_dag?dag_id="+dagId+"&conf="+urlEncodedParameters;
$.ajax({
url: dagRunUrl,
dataType: "json",
success: function(msg) {
console.log('Successfully started the dag');
},
error: function(e){
console.log('Failed to start the dag');
}
});
}
Ответ 2
Новая опция в потоке воздуха - экспериментальная, но встроенная конечная точка API в более поздних сборках 1.7 и 1.8. Это позволяет вам запустить службу REST на сервере airflow, чтобы прослушивать порт и принимать задания cli.
У меня есть только ограниченный опыт, но я с успехом пробовал тесты. Согласно документам:
/api/experimental/dags/<DAG_ID>/dag_runs
создает dag_run для данного идентификатора dag (POST).
Это наметит немедленный запуск любой метки, которую вы хотите запустить. Тем не менее, он по-прежнему использует планировщик, ожидая сердцебиения, чтобы увидеть, что dag работает, и передать задачи работнику. Это точно такое же поведение, как и в CLI, поэтому я все еще верю, что оно подходит для вашего варианта использования.
Документация по его настройке доступна здесь: https://airflow.apache.org/api.html.
В github также есть несколько простых примеров клиентов под airflow/api/clients
Ответ 3
Вы должны посмотреть на Датчик воздушного потока HTTP для своих нужд. Вы можете использовать это, чтобы вызвать даг.
Ответ 4
Для этой цели можно использовать интерфейс API REST Airflow.
Следующий запрос вызовет DAG:
curl -X POST \
http://<HOST>:8080/api/experimental/dags/process_data/dag_runs \
-H 'Cache-Control: no-cache' \
-H 'Content-Type: application/json' \
-d '{"conf":"{\"START_DATE\":\"2018-06-01 03:00:00\", \"STOP_DATE\":\"2018-06-01 23:00:00\"}'
Следующий запрос извлекает список прогонов Dag для определенного идентификатора DAG:
curl -i -H "Accept: application/json" -H "Content-Type: application/json" -X GET http://<HOST>:8080/api/experimental/dags/process_data/dag_runs
Для работы GET API установите флаг rbac
в True
на airflow.cfg
.
Ниже приведен список доступных API: здесь и там.
Ответ 5
Airflow имеет REST API (в настоящее время экспериментальный) - доступно здесь:
https://airflow.apache.org/api.html#endpoints
Если вы не хотите устанавливать плагины, как предложено в других ответах - вот код, как вы можете сделать это напрямую с помощью API:
def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None):
endpoint = '/api/experimental/dags/{}/dag_runs'.format(dag_id)
url = urljoin(self._api_base_url, endpoint)
data = self._request(url, method='POST',
json={
"run_id": run_id,
"conf": conf,
"execution_date": execution_date,
})
return data['message']
Дополнительные примеры работы с API-интерфейсом airflow в python доступны здесь:https://github.com/apache/airflow/blob/master/airflow/api/client/json_client.py