Deserialize файл Avro с С#
Я не могу найти способ десериализации файла Apache Avro с С#. Файл Avro - это файл, созданный Функция архива в концентраторах событий Microsoft Azure.
С помощью Java я могу использовать Avro Tools из Apache для преобразования файла в JSON:
java -jar avro-tools-1.8.1.jar tojson --pretty inputfile > output.json
Использование пакета NuGet Microsoft.Hadoop.Avro Я могу извлечь SequenceNumber
, Offset
и EnqueuedTimeUtc
, но так как я не знаю, какой тип использовать для Body
генерируется исключение. Я пробовал с Dictionary<string, object>
и другими типами.
static void Main(string[] args)
{
var fileName = "...";
using (Stream stream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read))
{
using (var reader = AvroContainer.CreateReader<EventData>(stream))
{
using (var streamReader = new SequentialReader<EventData>(reader))
{
var record = streamReader.Objects.FirstOrDefault();
}
}
}
}
[DataContract(Namespace = "Microsoft.ServiceBus.Messaging")]
public class EventData
{
[DataMember(Name = "SequenceNumber")]
public long SequenceNumber { get; set; }
[DataMember(Name = "Offset")]
public string Offset { get; set; }
[DataMember(Name = "EnqueuedTimeUtc")]
public string EnqueuedTimeUtc { get; set; }
[DataMember(Name = "Body")]
public foo Body { get; set; }
// More properties...
}
Схема выглядит следующим образом:
{
"type": "record",
"name": "EventData",
"namespace": "Microsoft.ServiceBus.Messaging",
"fields": [
{
"name": "SequenceNumber",
"type": "long"
},
{
"name": "Offset",
"type": "string"
},
{
"name": "EnqueuedTimeUtc",
"type": "string"
},
{
"name": "SystemProperties",
"type": {
"type": "map",
"values": [ "long", "double", "string", "bytes" ]
}
},
{
"name": "Properties",
"type": {
"type": "map",
"values": [ "long", "double", "string", "bytes" ]
}
},
{
"name": "Body",
"type": [ "null", "bytes" ]
}
]
}
Ответы
Ответ 1
Мне удалось получить полный доступ к данным с помощью dynamic
. Здесь приведен код для доступа к необработанным данным body
, который хранится в виде массива байтов. В моем случае эти байты содержат JSON с кодировкой UTF8, но, конечно, это зависит от того, как вы изначально создали экземпляры EventData
, которые вы опубликовали в Event Hub:
using (var reader = AvroContainer.CreateGenericReader(stream))
{
while (reader.MoveNext())
{
foreach (dynamic record in reader.Current.Objects)
{
var sequenceNumber = record.SequenceNumber;
var bodyText = Encoding.UTF8.GetString(record.Body);
Console.WriteLine($"{sequenceNumber}: {bodyText}");
}
}
}
Если кто-то может опубликовать статически типизированное решение, я его выберу, но учитывая, что большая задержка в любой системе почти наверняка будет связана с блогами блога Event Hub, я бы не стал беспокоиться о производительности синтаксического анализа.:)
Ответ 2
Этот Gist показывает, как десериализовать захват узла события с помощью С# с помощью Microsoft.Hadoop.Avro2, который имеет то преимущество, что оба .NET Framework 4.5 и .NET Standard 1.6:
var connectionString = "<Azure event hub capture storage account connection string>";
var containerName = "<Azure event hub capture container name>";
var blobName = "<Azure event hub capture BLOB name (ends in .avro)>";
var storageAccount = CloudStorageAccount.Parse(connectionString);
var blobClient = storageAccount.CreateCloudBlobClient();
var container = blobClient.GetContainerReference(containerName);
var blob = container.GetBlockBlobReference(blobName);
using (var stream = blob.OpenRead())
using (var reader = AvroContainer.CreateGenericReader(stream))
while (reader.MoveNext())
foreach (dynamic result in reader.Current.Objects)
{
var record = new AvroEventData(result);
record.Dump();
}
public struct AvroEventData
{
public AvroEventData(dynamic record)
{
SequenceNumber = (long) record.SequenceNumber;
Offset = (string) record.Offset;
DateTime.TryParse((string) record.EnqueuedTimeUtc, out var enqueuedTimeUtc);
EnqueuedTimeUtc = enqueuedTimeUtc;
SystemProperties = (Dictionary<string, object>) record.SystemProperties;
Properties = (Dictionary<string, object>) record.Properties;
Body = (byte[]) record.Body;
}
public long SequenceNumber { get; set; }
public string Offset { get; set; }
public DateTime EnqueuedTimeUtc { get; set; }
public Dictionary<string, object> SystemProperties { get; set; }
public Dictionary<string, object> Properties { get; set; }
public byte[] Body { get; set; }
}
-
Ссылки NuGet:
- Microsoft.Hadoop.Avro2 (1.2.1 работает)
- WindowsAzure.Storage(8.3.0 работает)
-
Пространство имен:
- Microsoft.Hadoop.Avro.Container
- Microsoft.WindowsAzure.Storage
Ответ 3
Наконец-то я смог заставить это работать с библиотекой/инфраструктурой Apache С#.
Я некоторое время задерживался, потому что функция Capture концентраторов Azure Event иногда выводит файл без содержимого сообщения.
Возможно, у меня также возникла проблема с тем, как сообщения были первоначально сериализованы в объект EventData.
Код ниже был для файла, сохраненного на диске из контейнера блочного захвата.
var dataFileReader = DataFileReader<EventData>.OpenReader(file);
foreach (var record in dataFileReader.NextEntries)
{
// Do work on EventData object
}
Это также работает с использованием объекта GenericRecord.
var dataFileReader = DataFileReader<GenericRecord>.OpenReader(file);
Это потребовало определенных усилий, чтобы выяснить. Однако теперь я согласен, что функция Azure Event Hubs Capture - отличная функция для резервного копирования всех событий. Я все еще чувствую, что они должны сделать формат факультативным, как и при работе с Stream Analytic, но, возможно, я привык к Avro.
Ответ 4
Ваши оставшиеся типы, я подозреваю, должны быть определены как:
[DataContract(Namespace = "Microsoft.ServiceBus.Messaging")]
[KnownType(typeof(Dictionary<string, object>))]
public class EventData
{
[DataMember]
public IDictionary<string, object> SystemProperties { get; set; }
[DataMember]
public IDictionary<string, object> Properties { get; set; }
[DataMember]
public byte[] Body { get; set; }
}
Даже если Body
является объединением null
и bytes
, это сопоставляется с a nullable
byte[]
.
В С# массивы всегда являются ссылочными типами, поэтому может быть null
и контракт выполнен.
Ответ 5
Вы также можете использовать атрибут NullableSchema
чтобы пометить тело как объединение байтов и нулей. Это позволит вам использовать строго типизированный интерфейс.
[DataContract(Namespace = "Microsoft.ServiceBus.Messaging")]
public class EventData
{
[DataMember(Name = "SequenceNumber")]
public long SequenceNumber { get; set; }
[DataMember(Name = "Offset")]
public string Offset { get; set; }
[DataMember(Name = "EnqueuedTimeUtc")]
public string EnqueuedTimeUtc { get; set; }
[DataMember(Name = "Body")]
[NullableSchema]
public foo Body { get; set; }
}
Ответ 6
Я всегда получаю исключение. Указанный аргумент находится вне диапазона допустимых значений. Имя параметра: размер и обнаружил, что об этой проблеме уже сообщалось в этой теме https://github.com/Azure/azure-sdk-for-net/выпуски /3709. Я использую .Net Core 2.2, Microsoft.Hadoop.Avro-Core 1.1.19, Microsoft.Azure.Storage.Blob 10.0.0
Любой ключ, чтобы решить эту проблему, я много пробовал без удачи?
Ответ 7
Для людей, имеющих проблемы с сериализацией/десериализацией данных Apache Avro в С#, я создал небольшую библиотеку, которая является интерфейсом для Microsoft.Hadoop.Avro:
https://github.com/AdrianStrugala/AvroConvert
https://www.nuget.org/packages/AvroConvert
Использование так же просто, как:
byte[] avroFileContent = File.ReadAllBytes(fileName);
Dictionary<string, object> result = AvroConvert.Deserialize(avroFileContent);
//or if u know the model of the data
MyModel result = AvroConvert.Deserialize<MyModel>(avroFileContent);