Ответ 1
Есть несколько способов запланировать эту задачу. Как вы планируете свои рабочие процессы? Используете ли вы такую систему, как Airflow, Luigi, Azkaban, cron или конвейер данных AWS?
Из любого из них вы сможете запустить следующую команду CLI.
$ aws athena start-query-execution --query-string "MSCK REPAIR TABLE some_database.some_table" --result-configuration "OutputLocation=s3://SOMEPLACE"
Другим вариантом будет AWS Lambda. У вас может быть функция, которая вызывает MSCK REPAIR TABLE some_database.some_table
в ответ на новую загрузку на S3.
Пример лямбда-функции может быть записан так:
import boto3
def lambda_handler(event, context):
bucket_name = 'some_bucket'
client = boto3.client('athena')
config = {
'OutputLocation': 's3://' + bucket_name + '/',
'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}
}
# Query Execution Parameters
sql = 'MSCK REPAIR TABLE some_database.some_table'
context = {'Database': 'some_database'}
client.start_query_execution(QueryString = sql,
QueryExecutionContext = context,
ResultConfiguration = config)
Затем вы должны сконфигурировать триггер для выполнения вашей лямбда-функции, когда новые данные добавляются под префиксом DATA/
в вашем сегменте.
В конечном счете, явная перестройка разделов после запуска Spark Job с использованием планировщика заданий имеет преимущество самодокументирования. С другой стороны, AWS Lambda удобна для таких работ, как эта.