Потребление потока кинезитов в питоне
Я не могу найти достойный пример, который показывает, как я могу использовать поток AWS Kinesis через Python. Может кто-нибудь, пожалуйста, предоставит мне несколько примеров, на которые я мог бы обратить внимание?
Лучшие
Ответы
Ответ 1
Хотя этот вопрос уже был дан ответ, может быть хорошей идеей для будущих читателей рассмотреть возможность использования Kinesis Client Library (KCL) for Python
вместо прямого использования boto
. Это упрощает потребление из потока при наличии нескольких экземпляров клиента и/или изменении конфигураций осколков.
https://aws.amazon.com/blogs/aws/speak-to-kinesis-in-python/
Более полное перечисление того, что предоставляет KCL
- Подключается к потоку
- Перечисляет осколки
- Координаты ассоциаций осколков с другими работниками (если есть)
- Создает процессор записи для каждого осколка, который он управляет.
- Вытягивает записи данных из потока
- Отбрасывает записи в соответствующий процессор записи
- Контрольные точки обрабатывают записи (он использует DynamoDB, поэтому вашему коду не нужно вручную сохранять значение контрольной точки)
- Балансы ассоциаций shard-worker, когда число экземпляров рабочего изменяется
- Балансы ассоциаций оскол-работника, когда осколки разделены или объединены
Элементы, выделенные полужирным шрифтом, являются теми, которые, по моему мнению, являются тем, где KCL действительно предоставляет нетривиальное значение над boto. Но в зависимости от вашего использования, boto может быть намного проще.
Ответ 2
вам следует использовать boto.kinesis:
from boto import kinesis
После создания потока:
шаг 1: подключиться к aws kinesis:
auth = {"aws_access_key_id":"id", "aws_secret_access_key":"key"}
connection = kinesis.connect_to_region('us-east-1',**auth)
Шаг 2: получите информацию о потоке (например, сколько обрывов, если оно активно..)
tries = 0
while tries < 10:
tries += 1
time.sleep(1)
try:
response = connection.describe_stream('stream_name')
if response['StreamDescription']['StreamStatus'] == 'ACTIVE':
break
except :
logger.error('error while trying to describe kinesis stream : %s')
else:
raise TimeoutError('Stream is still not active, aborting...')
Шаг 3: получите все идентификаторы shard и для каждого общего идентификатора получите итератор осколков:
shard_ids = []
stream_name = None
if response and 'StreamDescription' in response:
stream_name = response['StreamDescription']['StreamName']
for shard_id in response['StreamDescription']['Shards']:
shard_id = shard_id['ShardId']
shard_iterator = connection.get_shard_iterator(stream_name, shard_id, shard_iterator_type)
shard_ids.append({'shard_id' : shard_id ,'shard_iterator' : shard_iterator['ShardIterator'] })
Шаг 4: прочитайте данные для каждого осколка
limit - это предел для записей, которые вы хотите получить. (вы можете получить до 10 МБ)
shard_iterator является общим с предыдущего шага.
tries = 0
result = []
while tries < 100:
tries += 1
response = connection.get_records(shard_iterator = shard_iterator , limit = limit)
shard_iterator = response['NextShardIterator']
if len(response['Records'])> 0:
for res in response['Records']:
result.append(res['Data'])
return result , shard_iterator
в следующем вызове get_records, вы должны использовать shard_iterator, который вы получили с результатом предыдущих get_records.
note: в одном вызове get_records (limit = None) вы можете получать пустые записи.
если вы вызываете get_records с лимитом, вы получите записи, которые находятся в одном ключе раздела (когда вы вставляете данные в поток, вам нужно использовать ключ раздела:
connection.put_record(stream_name, data, partition_key)