Полное сканирование dynamoDb с помощью boto3
Моя таблица составляет около 220 МБ с 250 тыс. записей. Я пытаюсь вытащить все эти данные в python. Я понимаю, что это должен быть пакетный пакетный процесс и зацикливаться, но я не уверен, как я могу установить партии, чтобы начать, где предыдущий остановился.
Есть ли способ фильтровать сканирование? Из того, что я прочитал, фильтрация происходит после загрузки, а загрузка останавливается на 1 мб, поэтому я бы не смог сканировать в новых объектах.
Любая помощь будет оценена.
import boto3
dynamodb = boto3.resource('dynamodb',
aws_session_token = aws_session_token,
aws_access_key_id = aws_access_key_id,
aws_secret_access_key = aws_secret_access_key,
region_name = region
)
table = dynamodb.Table('widgetsTableName')
data = table.scan()
Ответы
Ответ 1
Я думаю, что документация Amazon DynamoDB относительно сканирования таблиц отвечает на ваш вопрос.
Короче говоря, вам нужно будет проверить LastEvaluatedKey
в ответе. Вот пример использования вашего кода:
import boto3
dynamodb = boto3.resource('dynamodb',
aws_session_token=aws_session_token,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region
)
table = dynamodb.Table('widgetsTableName')
response = table.scan()
data = response['Items']
while 'LastEvaluatedKey' in response:
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
data.extend(response['Items'])
Ответ 2
boto3 предлагает paginators, которые обрабатывают все детали разбивки на страницы для вас. Здесь - страница документа для страницы сканирования. В принципе, вы бы использовали его так:
import boto3
client = boto3.client('dynamodb')
paginator = client.get_paginator('scan')
for page in paginator.paginate():
# do something
Ответ 3
Отправляясь от Джордона Филлипса, ответьте, как вы проходите FilterExpression
с разбиением на страницы:
import boto3
client = boto3.client('dynamodb')
paginator = client.get_paginator('scan')
operation_parameters = {
'TableName': 'foo',
'FilterExpression': 'bar > :x AND bar < :y',
'ExpressionAttributeValues': {
':x': {'S': '2017-01-31T01:35'},
':y': {'S': '2017-01-31T02:08'},
}
}
page_iterator = paginator.paginate(**operation_parameters)
for page in page_iterator:
# do something
Ответ 4
Код для удаления типа формата dynamodb, упомянутого как @kungphu.
import boto3
from boto3.dynamodb.types import TypeDeserializer
from boto3.dynamodb.transform import TransformationInjector
client = boto3.client('dynamodb')
paginator = client.get_paginator('query')
service_model = client._service_model.operation_model('Query')
trans = TransformationInjector(deserializer = TypeDeserializer())
for page in paginator.paginate():
trans.inject_attribute_value_output(page, service_model)
Ответ 5
Оказывается, что Boto3 захватывает "LastEvaluatedKey" как часть возвращаемого ответа. Это можно использовать в качестве начальной точки для сканирования:
data= table.scan(
ExclusiveStartKey=data['LastEvaluatedKey']
)
Я планирую создать цикл вокруг этого, пока возвращаемые данные не будут только ExclusiveStartKey
Ответ 6
Оба предложенных выше двух подхода имеют проблемы: либо написание длинного и повторяющегося кода, который явно обрабатывает разбиение на страницы в цикле, либо использование пагинаторов Boto с низкоуровневыми сеансами, а также с недостатками объектов Boto более высокого уровня.
Решение, использующее функциональный код Python для обеспечения абстракции высокого уровня, позволяет использовать методы Boto более высокого уровня, скрывая при этом сложность разбиения на страницы AWS:
import itertools
import typing
def iterate_result_pages(function_returning_response: typing.Callable, *args, **kwargs) -> typing.Generator:
"""A wrapper for functions using AWS paging, that returns a generator which yields a sequence of items for
every response
Args:
function_returning_response: A function (or callable), that returns an AWS response with 'Items' and optionally 'LastEvaluatedKey'
This could be a bound method of an object.
Returns:
A generator which yields the 'Items' field of the result for every response
"""
response = function_returning_response(*args, **kwargs)
yield response["Items"]
while "LastEvaluatedKey" in response:
kwargs["ExclusiveStartKey"] = response["LastEvaluatedKey"]
response = function_returning_response(*args, **kwargs)
yield response["Items"]
return
def iterate_paged_results(function_returning_response: typing.Callable, *args, **kwargs) -> typing.Iterator:
"""A wrapper for functions using AWS paging, that returns an iterator of all the items in the responses.
Items are yielded to the caller as soon as they are received.
Args:
function_returning_response: A function (or callable), that returns an AWS response with 'Items' and optionally 'LastEvaluatedKey'
This could be a bound method of an object.
Returns:
An iterator which yields one response item at a time
"""
return itertools.chain.from_iterable(iterate_result_pages(function_returning_response, *args, **kwargs))
# Example, assuming 'table' is a Boto DynamoDB table object:
all_items = list(iterate_paged_results(ProjectionExpression = 'my_field'))
Ответ 7
У меня были некоторые проблемы с ответом Винсента, связанные с преобразованием, применяемым к LastEvaluatedKey, и путаницей в нумерации страниц. Решается следующим образом:
import boto3
from boto3.dynamodb.types import TypeDeserializer
from boto3.dynamodb.transform import TransformationInjector
client = boto3.client('dynamodb')
paginator = client.get_paginator('scan')
operation_model = client._service_model.operation_model('Scan')
trans = TransformationInjector(deserializer = TypeDeserializer())
operation_parameters = {
'TableName': 'tablename',
}
items = []
for page in paginator.paginate(**operation_parameters):
has_last_key = 'LastEvaluatedKey' in page
if has_last_key:
last_key = page['LastEvaluatedKey'].copy()
trans.inject_attribute_value_output(page, operation_model)
if has_last_key:
page['LastEvaluatedKey'] = last_key
items.extend(page['Items'])
Ответ 8
DynamoDB ограничивает метод scan
до 1 МБ данных на сканирование.
Документация: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.scan
Вот пример цикла, чтобы получить все данные из таблицы DynamoDB, используя LastEvaluatedKey
:
import boto3
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('your_table_name')
has_items = True
last_key = False
while has_items:
if last_key:
data = table.scan(ExclusiveStartKey=last_key)
else:
data = table.scan()
if 'LastEvaluatedKey' in data:
has_items = True
last_key = data['LastEvaluatedKey']
else:
has_items = False
last_key = False
# TODO do something with data['Items'] here.