В Java, как я могу создать эквивалент файла контейнера Apache Avro без принудительного использования файла в качестве носителя?
Это в некотором роде выстрел в темноте, если кто-то подкован с реализацией Java Apache Avro, читает это.
Моя задача на высоком уровне состоит в том, чтобы каким-то образом передать некоторые данные avro по сети (например, просто скажем HTTP, но конкретный протокол не так важен для этой цели). В моем контексте у меня есть HttpServletResponse Мне нужно как-то записать эти данные.
Сначала я попытался записать данные как то, что составляло виртуальную версию файла контейнера avro (предположим, что "response" имеет тип HttpServletResponse):
response.setContentType("application/octet-stream");
response.setHeader("Content-transfer-encoding", "binary");
ServletOutputStream outStream = response.getOutputStream();
BufferedOutputStream bos = new BufferedOutputStream(outStream);
Schema someSchema = Schema.parse(".....some valid avro schema....");
GenericRecord someRecord = new GenericData.Record(someSchema);
someRecord.put("somefield", someData);
...
GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(someSchema);
DataFileWriter<GenericRecord> fileWriter = new DataFileWriter<GenericRecord>(datumWriter);
fileWriter.create(someSchema, bos);
fileWriter.append(someRecord);
fileWriter.close();
bos.flush();
Все было в порядке и денди, за исключением того, что оказалось, что Avro не предоставляет способ читать файл контейнера отдельно от фактического файла: DataFileReader имеет только два конструктора:
public DataFileReader(File file, DatumReader<D> reader);
и
public DataFileReader(SeekableInput sin, DatumReader<D> reader);
где SeekableInput - это специальная настраиваемая форма avro, создание которой также заканчивается чтением из файла. Теперь, учитывая это, если есть какой-то способ как-то принудить InputStream к файлу (http://stackoverflow.com/info/578305/create-a-java-file-object-or-equivalent-using-a-byte- array-in-memory-without-a предполагает, что этого не происходит, и я также попытался оглянуться на документацию по Java), этот подход не будет работать, если читатель на другом конце OutputStream получит этот файл контейнера avro ( Я не уверен, почему они разрешили выводить двоичные файлы контейнера avro на произвольный OutputStream, не предоставляя способ прочитать их из соответствующего InputStream на другом конце, но это рядом с точкой). Похоже, что реализация файлового чтения контейнера требует "поисковой" функциональности, которую предоставляет конкретный файл.
Хорошо, так что не похоже, что этот подход сделает то, что я хочу. Как насчет создания ответа JSON, который имитирует файл контейнера avro?
public static Schema WRAPPER_SCHEMA = Schema.parse(
"{\"type\": \"record\", " +
"\"name\": \"AvroContainer\", " +
"\"doc\": \"a JSON avro container file\", " +
"\"namespace\": \"org.bar.foo\", " +
"\"fields\": [" +
"{\"name\": \"schema\", \"type\": \"string\", \"doc\": \"schema representing the included data\"}, " +
"{\"name\": \"data\", \"type\": \"bytes\", \"doc\": \"packet of data represented by the schema\"}]}"
);
Я не уверен, что это лучший способ приблизиться к этому с учетом вышеуказанных ограничений, но похоже, что это может сделать трюк. Я положу схему (например, "Schema someSchema" сверху) в виде строки в поле "схема", а затем поместил в австро-двоичную сериализованную форму записи, соответствующую этой схеме (т.е. "GenericRecord" someRecord ") внутри поля данных.
Я действительно хотел узнать о конкретной детали того, что описано ниже, но я подумал, что было бы целесообразно также дать более широкий контекст, чтобы, если есть более высокий подход на высоком уровне, который я мог бы взять ( этот подход работает, но просто не кажется оптимальным), пожалуйста, дайте мне знать.
Мой вопрос заключается в том, что, исходя из этого подхода, основанного на JSON, как написать двоичное представление Avore моей записи в поле "данные" схемы AvroContainer? Например, я встал здесь:
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(someSchema);
Encoder e = new BinaryEncoder(baos);
datumWriter.write(resultsRecord, e);
e.flush();
GenericRecord someRecord = new GenericData.Record(someSchema);
someRecord.put("schema", someSchema.toString());
someRecord.put("data", ByteBuffer.wrap(baos.toByteArray()));
datumWriter = new GenericDatumWriter<GenericRecord>(WRAPPER_SCHEMA);
JsonGenerator jsonGenerator = new JsonFactory().createJsonGenerator(baos, JsonEncoding.UTF8);
e = new JsonEncoder(WRAPPER_SCHEMA, jsonGenerator);
datumWriter.write(someRecord, e);
e.flush();
PrintWriter printWriter = response.getWriter(); // recall that response is the HttpServletResponse
response.setContentType("text/plain");
response.setCharacterEncoding("UTF-8");
printWriter.print(baos.toString("UTF-8"));
Сначала я попытался исключить предложение ByteBuffer.wrap, но затем строка
datumWriter.write(someRecord, e);
бросил исключение, что я не мог отбросить массив байтов в ByteBuffer. Достаточно справедливо, похоже, когда класс Encoder (из которого JsonEncoder является подклассом) вызывается для записи объекта avro Bytes, он требует, чтобы ByteBuffer был задан как аргумент. Таким образом, я попробовал инкапсулировать байт [] с помощью java.nio.ByteBuffer.wrap, но когда данные были распечатаны, он был напечатан как прямая серия байтов, без прохождения через австровое шестнадцатеричное представление:
"data": {"bytes": ".....some gibberish other than the expected format...}
Это не кажется правильным. Согласно документации avro, пример байтов, который они дают, говорит о том, что мне нужно поставить объект json, пример которого выглядит как "\ u00FF", и то, что я там вписал, явно не соответствует этому формату. Теперь я хочу знать следующее:
- Каков пример формата avro bytes? Это похоже на "\ uDEADBEEFDEADBEEF..."?
- Как я могу принудить свои двоичные данные avro (как вывод BinaryEncoder в массив byte []) в формат, который я могу вставить в объект GenericRecord и правильно ли напечатать его в JSON? Например, мне нужен объект DATA, для которого я могу вызвать некоторый GenericRecord "someRecord.put(" данные ", DATA); с моими сериализованными данными avro внутри?
- Как бы я затем прочитал эти данные обратно в массив байтов на другом (потребительском) конце, когда ему было предоставлено текстовое представление JSON и он хочет воссоздать GenericRecord, представленный форматом JSON AvroContainer?
- (повторив вопрос раньше) Есть ли лучший способ, которым я мог бы все это сделать?
Ответы
Ответ 1
Как сказал Кнут, если вы хотите использовать что-то другое, кроме файла, вы можете:
- использовать SeekableByteArrayInput, как сказал Кнут, за все, что вы можете сделать, в байт-массив
- Внедрение SeekablInput по-своему - например, если вы извлекаете его из какой-то странной структуры базы данных.
- Или просто используйте файл. Почему бы и нет?
Это ваши ответы.
Ответ 2
Как я решил, это отправить схемы отдельно от данных. Я установил соединение, которое передает схемы вниз с сервера, а затем отправляю закодированные данные взад и вперед. Вы должны создать внешний объект-обертку следующим образом:
{'name':'Wrapper','type':'record','fields':[
{'name':'schemaName','type':'string'},
{'name':'records','type':{'type':'array','items':'bytes'}}
]}
Где вы сначала кодируете свой массив записей один за другим в массив закодированных байтовых массивов. Все в одном массиве должно иметь одну и ту же схему. Затем вы кодируете объект-оболочку с приведенной выше схемой - задайте "schemaName" как имя схемы, которую вы использовали для кодирования массива.
На сервере сначала будет декодирован объект-оболочка. После того, как вы декодируете объект-оболочку, вы знаете имя схемы, и у вас есть массив объектов, которые вы знаете, как декодировать - используйте, как и вы!
Обратите внимание, что вы можете уйти без использования объекта-обертки, если используете протокол типа WebSockets
и движок вроде Socket.IO
( для Node.js
) Socket.io предоставляет вам канал связи на основе канала между браузером и сервером. В этом случае просто используйте определенную схему для каждого канала, запишите каждое сообщение перед его отправкой. Вы все равно должны делиться схемами, когда инициируется соединение, но если вы используете WebSockets
, это легко реализовать. И когда вы закончите, у вас есть произвольное количество строго типизированных двунаправленных потоков между клиентом и сервером.
Ответ 3
В Java и Scala мы попытались использовать начало с помощью кода, сгенерированного с помощью Scala nitro codegen. Началом является то, как библиотека javascript mtth/avsc решила эту проблему . Тем не менее, мы столкнулись с несколькими проблемами сериализации с использованием библиотеки Java, где последовательно были введены ошибочные байты, которые последовательно вставлялись в поток байтов, и мы не могли понять, откуда эти байты.
Конечно, это означало создание нашей собственной реализации Varint с кодировкой ZigZag. Мех.
Вот он:
package com.terradatum.query
import java.io.ByteArrayOutputStream
import java.nio.ByteBuffer
import java.security.MessageDigest
import java.util.UUID
import akka.actor.ActorSystem
import akka.stream.stage._
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import com.nitro.scalaAvro.runtime.GeneratedMessage
import com.terradatum.diagnostics.AkkaLogging
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumWriter, GenericRecord}
import org.apache.avro.io.EncoderFactory
import org.elasticsearch.search.SearchHit
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
/*
* The original implementation of this helper relied exclusively on using the Header Avro record and inception to create
* the header. That didn't work for us because somehow erroneous bytes were injected into the output.
*
* Specifically:
* 1. 0x08 prepended to the magic
* 2. 0x0020 between the header and the sync marker
*
* Rather than continue to spend a large number of hours trying to troubleshoot why the Avro library was producing such
* erroneous output, we build the Avro Container File using a combination of our own code and Avro library code.
*
* This means that Terradatum code is responsible for the Avro Container File header (including magic, file metadata and
* sync marker) and building the blocks. We only use the Avro library code to build the binary encoding of the Avro
* records.
*
* @see https://avro.apache.org/docs/1.8.1/spec.html#Object+Container+Files
*/
object AvroContainerFileHelpers {
val magic: ByteBuffer = {
val magicBytes = "Obj".getBytes ++ Array[Byte](1.toByte)
val mg = ByteBuffer.allocate(magicBytes.length).put(magicBytes)
mg.position(0)
mg
}
def makeSyncMarker(): Array[Byte] = {
val digester = MessageDigest.getInstance("MD5")
digester.update(s"${UUID.randomUUID}@${System.currentTimeMillis()}".getBytes)
val marker = ByteBuffer.allocate(16).put(digester.digest()).compact()
marker.position(0)
marker.array()
}
/*
* Note that other implementations of avro container files, such as the javascript library
* mtth/avsc uses "inception" to encode the header, that is, a datum following a header
* schema should produce valid headers. We originally had attempted to do the same but for
* an unknown reason two bytes wore being inserted into our header, one at the very beginning
* of the header before the MAGIC marker, and one right before the syncmarker of the header.
* We were unable to determine why this wasn't working, and so this solution was used instead
* where the record/map is encoded per the avro spec manually without the use of "inception."
*/
def header(schema: Schema, syncMarker: Array[Byte]): Array[Byte] = {
def avroMap(map: Map[String, ByteBuffer]): Array[Byte] = {
val mapBytes = map.flatMap {
case (k, vBuff) =>
val v = vBuff.array()
val byteStr = k.getBytes()
Varint.encodeLong(byteStr.length) ++ byteStr ++ Varint.encodeLong(v.length) ++ v
}
Varint.encodeLong(map.size.toLong) ++ mapBytes ++ Varint.encodeLong(0)
}
val schemaBytes = schema.toString.getBytes
val schemaBuffer = ByteBuffer.allocate(schemaBytes.length).put(schemaBytes)
schemaBuffer.position(0)
val metadata = Map("avro.schema" -> schemaBuffer)
magic.array() ++ avroMap(metadata) ++ syncMarker
}
def block(binaryRecords: Seq[Array[Byte]], syncMarker: Array[Byte]): Array[Byte] = {
val countBytes = Varint.encodeLong(binaryRecords.length.toLong)
val sizeBytes = Varint.encodeLong(binaryRecords.foldLeft(0)(_+_.length).toLong)
val buff: ArrayBuffer[Byte] = new scala.collection.mutable.ArrayBuffer[Byte]()
buff.append(countBytes:_*)
buff.append(sizeBytes:_*)
binaryRecords.foreach { rec =>
buff.append(rec:_*)
}
buff.append(syncMarker:_*)
buff.toArray
}
def encodeBlock[T](schema: Schema, records: Seq[GenericRecord], syncMarker: Array[Byte]): Array[Byte] = {
//block(records.map(encodeRecord(schema, _)), syncMarker)
val writer = new GenericDatumWriter[GenericRecord](schema)
val out = new ByteArrayOutputStream()
val binaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
records.foreach(record => writer.write(record, binaryEncoder))
binaryEncoder.flush()
val flattenedRecords = out.toByteArray
out.close()
val buff: ArrayBuffer[Byte] = new scala.collection.mutable.ArrayBuffer[Byte]()
val countBytes = Varint.encodeLong(records.length.toLong)
val sizeBytes = Varint.encodeLong(flattenedRecords.length.toLong)
buff.append(countBytes:_*)
buff.append(sizeBytes:_*)
buff.append(flattenedRecords:_*)
buff.append(syncMarker:_*)
buff.toArray
}
def encodeRecord[R <: GeneratedMessage with com.nitro.scalaAvro.runtime.Message[R]: ClassTag](
entity: R
): Array[Byte] =
encodeRecord(entity.companion.schema, entity.toMutable)
def encodeRecord(schema: Schema, record: GenericRecord): Array[Byte] = {
val writer = new GenericDatumWriter[GenericRecord](schema)
val out = new ByteArrayOutputStream()
val binaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
writer.write(record, binaryEncoder)
binaryEncoder.flush()
val bytes = out.toByteArray
out.close()
bytes
}
}
/**
* Encoding of integers with variable-length encoding.
*
* The avro specification uses a variable length encoding for integers and longs.
* If the most significant bit in a integer or long byte is 0 then it knows that no
* more bytes are needed, if the most significant bit is 1 then it knows that at least one
* more byte is needed. In signed ints and longs the most significant bit is traditionally
* used to represent the sign of the integer or long, but for us it used to encode whether
* more bytes are needed. To get around this limitation we zig-zag through whole numbers such that
* negatives are odd numbers and positives are even numbers:
*
* i.e. -1, -2, -3 would be encoded as 1, 3, 5, and so on
* while 1, 2, 3 would be encoded as 2, 4, 6, and so on.
*
* More information is available in the avro specification here:
* @see http://lucene.apache.org/core/3_5_0/fileformats.html#VInt
* https://developers.google.com/protocol-buffers/docs/encoding?csw=1#types
*/
object Varint {
import scala.collection.mutable
def encodeLong(longVal: Long): Array[Byte] = {
val buff = new ArrayBuffer[Byte]()
Varint.zigZagSignedLong(longVal, buff)
buff.toArray[Byte]
}
def encodeInt(intVal: Int): Array[Byte] = {
val buff = new ArrayBuffer[Byte]()
Varint.zigZagSignedInt(intVal, buff)
buff.toArray[Byte]
}
def zigZagSignedLong[T <: mutable.Buffer[Byte]](x: Long, dest: T): Unit = {
// sign to even/odd mapping: http://code.google.com/apis/protocolbuffers/docs/encoding.html#types
writeUnsignedLong((x << 1) ^ (x >> 63), dest)
}
def writeUnsignedLong[T <: mutable.Buffer[Byte]](v: Long, dest: T): Unit = {
var x = v
while ((x & 0xFFFFFFFFFFFFFF80L) != 0L) {
dest += ((x & 0x7F) | 0x80).toByte
x >>>= 7
}
dest += (x & 0x7F).toByte
}
def zigZagSignedInt[T <: mutable.Buffer[Byte]](x: Int, dest: T): Unit = {
writeUnsignedInt((x << 1) ^ (x >> 31), dest)
}
def writeUnsignedInt[T <: mutable.Buffer[Byte]](v: Int, dest: T): Unit = {
var x = v
while ((x & 0xFFFFF80) != 0L) {
dest += ((x & 0x7F) | 0x80).toByte
x >>>= 7
}
dest += (x & 0x7F).toByte
}
}