Play 2.x: загрузка реактивного файла с помощью Iteratees
Я начну с вопроса: как использовать Scala API Iteratee
для загрузки файла в хранилище облаков (в моем случае это хранилище Azure Blob, но я не думаю, что это самое главное сейчас)
Фон:
Мне нужно записать в блок размером около 1 МБ для хранения больших медиафайлов (300 МБ +) в качестве Azure BlockBlobs
. К сожалению, мои знания Scala все еще плохие (мой проект основан на Java, и единственное использование для Scala в нем будет контроллером загрузки).
Я попытался с помощью этого кода: Почему вы вызываете ошибку или выполняете в Iteratee BodyParser запрос зависает в Play Framework 2.0? (как Input
Iteratee
) - он работает достаточно хорошо, но каждый Element
, который я мог использовать, имеет размер 8192 байта, поэтому он слишком мал для отправки в облако файлов сотен мегабайт.
Я должен сказать, что совершенно новый подход ко мне, и, скорее всего, я что-то неправильно понял (не хочу сказать, что я все неправильно понял) >
Я буду благодарен за любой намек или ссылку, которая поможет мне в этой теме. Если есть какой-либо образец аналогичного использования, это был бы лучший вариант для меня, чтобы получить эту идею.
Ответы
Ответ 1
В основном то, что вам нужно, это повторный ввод в виде более крупных блоков, 1024 * 1024 байта.
Сначала давайте иметь Iteratee
, который будет потреблять до 1 м байт (ok, чтобы последний кусок меньше)
val consumeAMB =
Traversable.takeUpTo[Array[Byte]](1024*1024) &>> Iteratee.consume()
Используя это, мы можем построить Enumeratee
(адаптер), который будет перегруппировать куски, используя API под названием grouped:
val rechunkAdapter:Enumeratee[Array[Byte],Array[Byte]] =
Enumeratee.grouped(consumeAMB)
Здесь grouped использует Iteratee
, чтобы определить, сколько нужно положить в каждый кусок. Для этого он использует наш consumeAMB. Это означает, что результатом является Enumeratee
, который переписывает ввод в Array[Byte]
1 МБ.
Теперь нам нужно написать BodyParser
, который будет использовать метод Iteratee.foldM
для отправки каждого фрагмента байтов:
val writeToStore: Iteratee[Array[Byte],_] =
Iteratee.foldM[Array[Byte],_](connectionHandle){ (c,bytes) =>
// write bytes and return next handle, probable in a Future
}
foldM передает состояние и использует его в своей переданной функции (S,Input[Array[Byte]]) => Future[S]
, чтобы вернуть новое состояние будущего. foldM не будет вызывать функцию снова до тех пор, пока Future
не будет завершен, и есть доступный фрагмент ввода.
И анализатор тела будет повторять ввод и вставлять его в хранилище:
BodyParser( rh => (rechunkAdapter &>> writeToStore).map(Right(_)))
Возврат вправо указывает, что вы возвращаете тело к концу разбора тела (который здесь является обработчиком).
Ответ 2
Если ваша цель состоит в потоке на S3, вот помощник, который я реализовал и протестировал:
def uploadStream(bucket: String, key: String, enum: Enumerator[Array[Byte]])
(implicit ec: ExecutionContext): Future[CompleteMultipartUploadResult] = {
import scala.collection.JavaConversions._
val initRequest = new InitiateMultipartUploadRequest(bucket, key)
val initResponse = s3.initiateMultipartUpload(initRequest)
val uploadId = initResponse.getUploadId
val rechunker: Enumeratee[Array[Byte], Array[Byte]] = Enumeratee.grouped {
Traversable.takeUpTo[Array[Byte]](5 * 1024 * 1024) &>> Iteratee.consume()
}
val uploader = Iteratee.foldM[Array[Byte], Seq[PartETag]](Seq.empty) { case (etags, bytes) =>
val uploadRequest = new UploadPartRequest()
.withBucketName(bucket)
.withKey(key)
.withPartNumber(etags.length + 1)
.withUploadId(uploadId)
.withInputStream(new ByteArrayInputStream(bytes))
.withPartSize(bytes.length)
val etag = Future { s3.uploadPart(uploadRequest).getPartETag }
etag.map(etags :+ _)
}
val futETags = enum &> rechunker |>>> uploader
futETags.map { etags =>
val compRequest = new CompleteMultipartUploadRequest(bucket, key, uploadId, etags.toBuffer[PartETag])
s3.completeMultipartUpload(compRequest)
}.recoverWith { case e: Exception =>
s3.abortMultipartUpload(new AbortMultipartUploadRequest(bucket, key, uploadId))
Future.failed(e)
}
}
Ответ 3
Для тех, кто также пытается найти решение этой проблемы с потоками, вместо того, чтобы писать совершенно новый BodyParser, вы также можете использовать то, что уже было реализовано в parse.multipartFormData.
Вы можете реализовать что-то вроде ниже, чтобы перезаписать обработчик по умолчанию handleFilePartAsTemporaryFile.
def handleFilePartAsS3FileUpload: PartHandler[FilePart[String]] = {
handleFilePart {
case FileInfo(partName, filename, contentType) =>
(rechunkAdapter &>> writeToS3).map {
_ =>
val compRequest = new CompleteMultipartUploadRequest(...)
amazonS3Client.completeMultipartUpload(compRequest)
...
}
}
}
def multipartFormDataS3: BodyParser[MultipartFormData[String]] = multipartFormData(handleFilePartAsS3FileUpload)
Я могу выполнить эту работу, но я по-прежнему не уверен, что весь процесс загрузки потоковым. Я попробовал несколько больших файлов, кажется, что загрузка S3 начинается только тогда, когда весь файл был отправлен с клиентской стороны.
Я посмотрел на реализацию вышеописанного анализатора, и я думаю, что все связано с Iteratee, поэтому файл должен быть потоковым.
Если у кого-то есть некоторое представление об этом, это будет очень полезно.
Ответ 4
добавить в конфигурационный файл следующее
play.http.parser.maxMemoryBuffer = 256K