Как использовать значение смещения EventData ServiceBus
У меня есть код, который использует данные события Service Bus, и я подозреваю, что мне нужно использовать свойство offset, поскольку в настоящее время моя программа (или, похоже,) повторно запускает одни и те же данные Event Hub снова и снова.
Мой код выглядит следующим образом:
public class EventHubListener : IEventProcessor
{
private static EventHubClient _eventHubClient;
private const string EhConnectionStringNoPath = "Endpoint=...";
private const string EhConnectionString = EhConnectionStringNoPath + ";...";
private const string EhEntityPath = "...";
public void Start()
{
_eventHubClient = EventHubClient.CreateFromConnectionString(EhConnectionString);
EventHubConsumerGroup defaultConsumerGroup = _eventHubClient.GetDefaultConsumerGroup();
EventHubDescription eventHub = NamespaceManager.CreateFromConnectionString(EhConnectionStringNoPath).GetEventHub(EhEntityPath);
foreach (string partitionId in eventHub.PartitionIds)
{
defaultConsumerGroup.RegisterProcessor<EventHubListener>(new Lease
{
PartitionId = partitionId
}, new EventProcessorCheckpointManager());
Console.WriteLine("Processing : " + partitionId);
}
}
public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (EventData eventData in messages)
{
string bytes = Encoding.UTF8.GetString(eventData.GetBytes());
MyData data = JsonConvert.DeserializeObject<MyData>(bytes);
Поскольку я получаю одни и те же сообщения снова и снова, я подозреваю, что мне нужно сделать что-то вроде этого:
string bytes = Encoding.UTF8.GetString(eventData.GetBytes(), eventData.Offset, eventData.SerializedSizeInBytes - eventData.Offset);
Тем не менее, Offset
- это строка, даже если она представляет собой числовое значение (например, "12345"). Документация по context.CheckPointAsync()
заставила его казаться, что это может быть ответ; однако, выдавая, что в конце цикла, кажется, не имеет значения.
Итак, у меня есть два вопроса:
- Что такое офсет? Является ли это тем, что я думаю (т.е. числовым маркером для точки в потоке), и если да, то почему это строка?
- Зачем мне снова получать одни и те же сообщения? Поскольку я понимаю Event Hubs, хотя они гарантируют хотя бы один раз, как только Checkpoint является проблемой, я не должен получать одни и те же сообщения.
РЕДАКТИРОВАТЬ:
Спустя некоторое время, я придумал что-то, что позволяет избежать этой проблемы; однако я, конечно, не стал бы утверждать, что это решение:
var filteredMessages =
messages.Where(a => a.EnqueuedTimeUtc >= _startDate)
.OrderBy(a => a.EnqueuedTimeUtc);
Использование EventProcessorHost
похоже, действительно ухудшило проблему; то есть не только переигрывались исторические события, но они, казалось, воспроизводились в произвольном порядке.
РЕДАКТИРОВАТЬ:
Я наткнулся на эту замечательную статью @Mikhail, которая, похоже, касается моей точной проблемы. Тем не мение; и, предположительно, корень моей проблемы (или один из них, если предположить, что это правильно, то я не уверен, почему использование EventProcessorHost
не просто работает из коробки, как @Mikhail сказал себя в комментариях). Однако версия ICheckpointManager
имеет только один метод интерфейса:
namespace Microsoft.ServiceBus.Messaging
{
public interface ICheckpointManager
{
Task CheckpointAsync(Lease lease, string offset, long sequenceNumber);
}
}
Ответы
Ответ 1
Ваш заголовок должен быть центром событий, а не служебной шиной. По вашему вопросу:
- Хотя центр событий имеет аналогичный дизайн, как Kafka, но одна большая разница в том, что вы сами должны управлять смещениями. Брокер событий-хабов совершенно не знает о вашем смещении группы потребителей.
- Таким образом, event hub sdk предоставляет некоторый класс поддержки для хранения смещения в учетной записи хранилища, но по-прежнему необходимо вызвать контрольную точку вручную после обработки сообщения.