В Java, как я могу создать эквивалент файла контейнера Apache Avro без принудительного использования файла в качестве носителя?

Это в некотором роде выстрел в темноте, если кто-то подкован с реализацией Java Apache Avro, читает это.

Моя задача на высоком уровне состоит в том, чтобы каким-то образом передать некоторые данные avro по сети (например, просто скажем HTTP, но конкретный протокол не так важен для этой цели). В моем контексте у меня есть HttpServletResponse Мне нужно как-то записать эти данные.

Сначала я попытался записать данные как то, что составляло виртуальную версию файла контейнера avro (предположим, что "response" имеет тип HttpServletResponse):

response.setContentType("application/octet-stream");
response.setHeader("Content-transfer-encoding", "binary");
ServletOutputStream outStream = response.getOutputStream();
BufferedOutputStream bos = new BufferedOutputStream(outStream);

Schema someSchema = Schema.parse(".....some valid avro schema....");
GenericRecord someRecord = new GenericData.Record(someSchema);
someRecord.put("somefield", someData);
...

GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(someSchema);
DataFileWriter<GenericRecord> fileWriter = new DataFileWriter<GenericRecord>(datumWriter);
fileWriter.create(someSchema, bos);
fileWriter.append(someRecord);
fileWriter.close();
bos.flush();

Все было в порядке и денди, за исключением того, что оказалось, что Avro не предоставляет способ читать файл контейнера отдельно от фактического файла: DataFileReader имеет только два конструктора:

public DataFileReader(File file, DatumReader<D> reader);

и

public DataFileReader(SeekableInput sin, DatumReader<D> reader);

где SeekableInput - это специальная настраиваемая форма avro, создание которой также заканчивается чтением из файла. Теперь, учитывая это, если есть какой-то способ как-то принудить InputStream к файлу (http://stackoverflow.com/info/578305/create-a-java-file-object-or-equivalent-using-a-byte- array-in-memory-without-a предполагает, что этого не происходит, и я также попытался оглянуться на документацию по Java), этот подход не будет работать, если читатель на другом конце OutputStream получит этот файл контейнера avro ( Я не уверен, почему они разрешили выводить двоичные файлы контейнера avro на произвольный OutputStream, не предоставляя способ прочитать их из соответствующего InputStream на другом конце, но это рядом с точкой). Похоже, что реализация файлового чтения контейнера требует "поисковой" функциональности, которую предоставляет конкретный файл.

Хорошо, так что не похоже, что этот подход сделает то, что я хочу. Как насчет создания ответа JSON, который имитирует файл контейнера avro?

public static Schema WRAPPER_SCHEMA = Schema.parse(
  "{\"type\": \"record\", " +
   "\"name\": \"AvroContainer\", " +
   "\"doc\": \"a JSON avro container file\", " +
   "\"namespace\": \"org.bar.foo\", " +
   "\"fields\": [" +
     "{\"name\": \"schema\", \"type\": \"string\", \"doc\": \"schema representing the included data\"}, " +
     "{\"name\": \"data\", \"type\": \"bytes\", \"doc\": \"packet of data represented by the schema\"}]}"
  );

Я не уверен, что это лучший способ приблизиться к этому с учетом вышеуказанных ограничений, но похоже, что это может сделать трюк. Я положу схему (например, "Schema someSchema" сверху) в виде строки в поле "схема", а затем поместил в австро-двоичную сериализованную форму записи, соответствующую этой схеме (т.е. "GenericRecord" someRecord ") внутри поля данных.

Я действительно хотел узнать о конкретной детали того, что описано ниже, но я подумал, что было бы целесообразно также дать более широкий контекст, чтобы, если есть более высокий подход на высоком уровне, который я мог бы взять ( этот подход работает, но просто не кажется оптимальным), пожалуйста, дайте мне знать.

Мой вопрос заключается в том, что, исходя из этого подхода, основанного на JSON, как написать двоичное представление Avore моей записи в поле "данные" схемы AvroContainer? Например, я встал здесь:

ByteArrayOutputStream baos = new ByteArrayOutputStream();
GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(someSchema);
Encoder e = new BinaryEncoder(baos);
datumWriter.write(resultsRecord, e);
e.flush();

GenericRecord someRecord = new GenericData.Record(someSchema);
someRecord.put("schema", someSchema.toString());
someRecord.put("data", ByteBuffer.wrap(baos.toByteArray()));
datumWriter = new GenericDatumWriter<GenericRecord>(WRAPPER_SCHEMA);
JsonGenerator jsonGenerator = new JsonFactory().createJsonGenerator(baos, JsonEncoding.UTF8);
e = new JsonEncoder(WRAPPER_SCHEMA, jsonGenerator);
datumWriter.write(someRecord, e);
e.flush();

PrintWriter printWriter = response.getWriter(); // recall that response is the HttpServletResponse
response.setContentType("text/plain");
response.setCharacterEncoding("UTF-8");
printWriter.print(baos.toString("UTF-8"));

Сначала я попытался исключить предложение ByteBuffer.wrap, но затем строка

datumWriter.write(someRecord, e);

бросил исключение, что я не мог отбросить массив байтов в ByteBuffer. Достаточно справедливо, похоже, когда класс Encoder (из которого JsonEncoder является подклассом) вызывается для записи объекта avro Bytes, он требует, чтобы ByteBuffer был задан как аргумент. Таким образом, я попробовал инкапсулировать байт [] с помощью java.nio.ByteBuffer.wrap, но когда данные были распечатаны, он был напечатан как прямая серия байтов, без прохождения через австровое шестнадцатеричное представление:

"data": {"bytes": ".....some gibberish other than the expected format...}

Это не кажется правильным. Согласно документации avro, пример байтов, который они дают, говорит о том, что мне нужно поставить объект json, пример которого выглядит как "\ u00FF", и то, что я там вписал, явно не соответствует этому формату. Теперь я хочу знать следующее:

  • Каков пример формата avro bytes? Это похоже на "\ uDEADBEEFDEADBEEF..."?
  • Как я могу принудить свои двоичные данные avro (как вывод BinaryEncoder в массив byte []) в формат, который я могу вставить в объект GenericRecord и правильно ли напечатать его в JSON? Например, мне нужен объект DATA, для которого я могу вызвать некоторый GenericRecord "someRecord.put(" данные ", DATA); с моими сериализованными данными avro внутри?
  • Как бы я затем прочитал эти данные обратно в массив байтов на другом (потребительском) конце, когда ему было предоставлено текстовое представление JSON и он хочет воссоздать GenericRecord, представленный форматом JSON AvroContainer?
  • (повторив вопрос раньше) Есть ли лучший способ, которым я мог бы все это сделать?

Ответы

Ответ 1

Как сказал Кнут, если вы хотите использовать что-то другое, кроме файла, вы можете:

  • использовать SeekableByteArrayInput, как сказал Кнут, за все, что вы можете сделать, в байт-массив
  • Внедрение SeekablInput по-своему - например, если вы извлекаете его из какой-то странной структуры базы данных.
  • Или просто используйте файл. Почему бы и нет?

Это ваши ответы.

Ответ 2

Как я решил, это отправить схемы отдельно от данных. Я установил соединение, которое передает схемы вниз с сервера, а затем отправляю закодированные данные взад и вперед. Вы должны создать внешний объект-обертку следующим образом:

{'name':'Wrapper','type':'record','fields':[
  {'name':'schemaName','type':'string'},
  {'name':'records','type':{'type':'array','items':'bytes'}}
]}

Где вы сначала кодируете свой массив записей один за другим в массив закодированных байтовых массивов. Все в одном массиве должно иметь одну и ту же схему. Затем вы кодируете объект-оболочку с приведенной выше схемой - задайте "schemaName" как имя схемы, которую вы использовали для кодирования массива.

На сервере сначала будет декодирован объект-оболочка. После того, как вы декодируете объект-оболочку, вы знаете имя схемы, и у вас есть массив объектов, которые вы знаете, как декодировать - используйте, как и вы!

Обратите внимание, что вы можете уйти без использования объекта-обертки, если используете протокол типа WebSockets и движок вроде Socket.IO ( для Node.js) Socket.io предоставляет вам канал связи на основе канала между браузером и сервером. В этом случае просто используйте определенную схему для каждого канала, запишите каждое сообщение перед его отправкой. Вы все равно должны делиться схемами, когда инициируется соединение, но если вы используете WebSockets, это легко реализовать. И когда вы закончите, у вас есть произвольное количество строго типизированных двунаправленных потоков между клиентом и сервером.

Ответ 3

В Java и Scala мы попытались использовать начало с помощью кода, сгенерированного с помощью Scala nitro codegen. Началом является то, как библиотека javascript mtth/avsc решила эту проблему . Тем не менее, мы столкнулись с несколькими проблемами сериализации с использованием библиотеки Java, где последовательно были введены ошибочные байты, которые последовательно вставлялись в поток байтов, и мы не могли понять, откуда эти байты.

Конечно, это означало создание нашей собственной реализации Varint с кодировкой ZigZag. Мех.

Вот он:

package com.terradatum.query

import java.io.ByteArrayOutputStream
import java.nio.ByteBuffer
import java.security.MessageDigest
import java.util.UUID

import akka.actor.ActorSystem
import akka.stream.stage._
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import com.nitro.scalaAvro.runtime.GeneratedMessage
import com.terradatum.diagnostics.AkkaLogging
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumWriter, GenericRecord}
import org.apache.avro.io.EncoderFactory
import org.elasticsearch.search.SearchHit

import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

/*
* The original implementation of this helper relied exclusively on using the Header Avro record and inception to create
* the header. That didn't work for us because somehow erroneous bytes were injected into the output.
*
* Specifically:
* 1. 0x08 prepended to the magic
* 2. 0x0020 between the header and the sync marker
*
* Rather than continue to spend a large number of hours trying to troubleshoot why the Avro library was producing such
* erroneous output, we build the Avro Container File using a combination of our own code and Avro library code.
*
* This means that Terradatum code is responsible for the Avro Container File header (including magic, file metadata and
* sync marker) and building the blocks. We only use the Avro library code to build the binary encoding of the Avro
* records.
*
* @see https://avro.apache.org/docs/1.8.1/spec.html#Object+Container+Files
*/
object AvroContainerFileHelpers {

  val magic: ByteBuffer = {
    val magicBytes = "Obj".getBytes ++ Array[Byte](1.toByte)
    val mg = ByteBuffer.allocate(magicBytes.length).put(magicBytes)
    mg.position(0)
    mg
  }

  def makeSyncMarker(): Array[Byte] = {
    val digester = MessageDigest.getInstance("MD5")
    digester.update(s"${UUID.randomUUID}@${System.currentTimeMillis()}".getBytes)
    val marker = ByteBuffer.allocate(16).put(digester.digest()).compact()
    marker.position(0)
    marker.array()
  }

  /*
  * Note that other implementations of avro container files, such as the javascript library
  * mtth/avsc uses "inception" to encode the header, that is, a datum following a header
  * schema should produce valid headers. We originally had attempted to do the same but for
  * an unknown reason two bytes wore being inserted into our header, one at the very beginning
  * of the header before the MAGIC marker, and one right before the syncmarker of the header.
  * We were unable to determine why this wasn't working, and so this solution was used instead
  * where the record/map is encoded per the avro spec manually without the use of "inception."
  */
  def header(schema: Schema, syncMarker: Array[Byte]): Array[Byte] = {
    def avroMap(map: Map[String, ByteBuffer]): Array[Byte] = {
      val mapBytes = map.flatMap {
        case (k, vBuff) =>
          val v = vBuff.array()
          val byteStr = k.getBytes()
          Varint.encodeLong(byteStr.length) ++ byteStr ++ Varint.encodeLong(v.length) ++ v
      }
      Varint.encodeLong(map.size.toLong) ++ mapBytes ++ Varint.encodeLong(0)
    }

    val schemaBytes = schema.toString.getBytes
    val schemaBuffer = ByteBuffer.allocate(schemaBytes.length).put(schemaBytes)
    schemaBuffer.position(0)
    val metadata = Map("avro.schema" -> schemaBuffer)
    magic.array() ++ avroMap(metadata) ++ syncMarker
  }

  def block(binaryRecords: Seq[Array[Byte]], syncMarker: Array[Byte]): Array[Byte] = {
    val countBytes = Varint.encodeLong(binaryRecords.length.toLong)
    val sizeBytes = Varint.encodeLong(binaryRecords.foldLeft(0)(_+_.length).toLong)

    val buff: ArrayBuffer[Byte] = new scala.collection.mutable.ArrayBuffer[Byte]()

    buff.append(countBytes:_*)
    buff.append(sizeBytes:_*)
    binaryRecords.foreach { rec =>
      buff.append(rec:_*)
    }
    buff.append(syncMarker:_*)

    buff.toArray
  }

  def encodeBlock[T](schema: Schema, records: Seq[GenericRecord], syncMarker: Array[Byte]): Array[Byte] = {
    //block(records.map(encodeRecord(schema, _)), syncMarker)
    val writer = new GenericDatumWriter[GenericRecord](schema)
    val out = new ByteArrayOutputStream()
    val binaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
    records.foreach(record => writer.write(record, binaryEncoder))
    binaryEncoder.flush()
    val flattenedRecords = out.toByteArray
    out.close()

    val buff: ArrayBuffer[Byte] = new scala.collection.mutable.ArrayBuffer[Byte]()

    val countBytes = Varint.encodeLong(records.length.toLong)
    val sizeBytes = Varint.encodeLong(flattenedRecords.length.toLong)

    buff.append(countBytes:_*)
    buff.append(sizeBytes:_*)
    buff.append(flattenedRecords:_*)
    buff.append(syncMarker:_*)

    buff.toArray
  }

  def encodeRecord[R <: GeneratedMessage with com.nitro.scalaAvro.runtime.Message[R]: ClassTag](
      entity: R
  ): Array[Byte] =
    encodeRecord(entity.companion.schema, entity.toMutable)

  def encodeRecord(schema: Schema, record: GenericRecord): Array[Byte] = {
    val writer = new GenericDatumWriter[GenericRecord](schema)
    val out = new ByteArrayOutputStream()
    val binaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
    writer.write(record, binaryEncoder)
    binaryEncoder.flush()
    val bytes = out.toByteArray
    out.close()
    bytes
  }
}

/**
  * Encoding of integers with variable-length encoding.
  *
  * The avro specification uses a variable length encoding for integers and longs.
  * If the most significant bit in a integer or long byte is 0 then it knows that no
  * more bytes are needed, if the most significant bit is 1 then it knows that at least one
  * more byte is needed. In signed ints and longs the most significant bit is traditionally
  * used to represent the sign of the integer or long, but for us it used to encode whether
  * more bytes are needed. To get around this limitation we zig-zag through whole numbers such that
  * negatives are odd numbers and positives are even numbers:
  *
  * i.e. -1, -2, -3 would be encoded as 1, 3, 5, and so on
  * while 1,  2,  3 would be encoded as 2, 4, 6, and so on.
  *
  * More information is available in the avro specification here:
  * @see http://lucene.apache.org/core/3_5_0/fileformats.html#VInt
  *      https://developers.google.com/protocol-buffers/docs/encoding?csw=1#types
  */
object Varint {

  import scala.collection.mutable

  def encodeLong(longVal: Long): Array[Byte] = {
    val buff = new ArrayBuffer[Byte]()
    Varint.zigZagSignedLong(longVal, buff)
    buff.toArray[Byte]
  }

  def encodeInt(intVal: Int): Array[Byte] = {
    val buff = new ArrayBuffer[Byte]()
    Varint.zigZagSignedInt(intVal, buff)
    buff.toArray[Byte]
  }

  def zigZagSignedLong[T <: mutable.Buffer[Byte]](x: Long, dest: T): Unit = {
    // sign to even/odd mapping: http://code.google.com/apis/protocolbuffers/docs/encoding.html#types
    writeUnsignedLong((x << 1) ^ (x >> 63), dest)
  }

  def writeUnsignedLong[T <: mutable.Buffer[Byte]](v: Long, dest: T): Unit = {
    var x = v
    while ((x & 0xFFFFFFFFFFFFFF80L) != 0L) {
      dest += ((x & 0x7F) | 0x80).toByte
      x >>>= 7
    }
    dest += (x & 0x7F).toByte
  }

  def zigZagSignedInt[T <: mutable.Buffer[Byte]](x: Int, dest: T): Unit = {
    writeUnsignedInt((x << 1) ^ (x >> 31), dest)
  }

  def writeUnsignedInt[T <: mutable.Buffer[Byte]](v: Int, dest: T): Unit = {
    var x = v
    while ((x & 0xFFFFF80) != 0L) {
      dest += ((x & 0x7F) | 0x80).toByte
      x >>>= 7
    }
    dest += (x & 0x7F).toByte
  }
}