Труба потока к s3.upload()
В настоящее время я использую плагин node.js, называемый s3-upload-stream, чтобы передавать очень большие файлы на Amazon S3. Он использует многопроцессорный API и по большей части работает очень хорошо.
Однако этот модуль показывает свой возраст, и я уже должен был внести в него изменения (автор тоже его устарел). Сегодня я столкнулся с другим вопросом с Amazon, и мне бы очень хотелось взять рекомендацию автора и начать использовать официальный aws-sdk для выполнения моих загрузок.
НО.
Официальный SDK, похоже, не поддерживает трубопроводы до s3.upload()
. Природа s3.upload заключается в том, что вы должны передать читаемый поток в качестве аргумента конструктору S3.
У меня есть примерно 120 модулей пользовательского кода, которые выполняют различные обработки файлов, и они не зависят от конечного пункта назначения их вывода. Двигатель передает им доступный для записи выходной поток, и они подключаются к нему. Я не могу передать им объект AWS.S3
и попросить их называть upload()
на нем без добавления кода ко всем модулям. Причина, по которой я использовал s3-upload-stream
, состояла в том, что она поддерживала трубопровод.
Есть ли способ сделать aws-sdk s3.upload()
что-то, что я могу передать потоку?
Ответы
Ответ 1
Оберните функцию S3 upload()
потоком node.js stream.PassThrough()
.
Вот пример:
inputStream
.pipe(uploadFromStream(s3));
function uploadFromStream(s3) {
var pass = new stream.PassThrough();
var params = {Bucket: BUCKET, Key: KEY, Body: pass};
s3.upload(params, function(err, data) {
console.log(err, data);
});
return pass;
}
Ответ 2
Немного запоздалый ответ, это может помочь кому-то еще, надеюсь. Вы можете вернуть как записываемый поток, так и обещание, чтобы вы могли получить данные ответа после завершения загрузки.
const AWS = require('aws-sdk');
const stream = require('stream');
const uploadStream = ({ Bucket, Key }) => {
const s3 = new AWS.S3();
const pass = new stream.PassThrough();
return {
writeStream: pass,
promise: s3.upload({ Bucket, Key, Body: pass }).promise(),
};
}
И вы можете использовать функцию следующим образом:
const { writeStream, promise } = uploadStream({Bucket: 'yourbucket', Key: 'yourfile.mp4'});
const readStream = fs.createReadStream('/path/to/yourfile.mp4');
readStream.pipe(writeStream);
promise.then(console.log);
Ответ 3
В принятом ответе функция заканчивается до завершения загрузки, и, следовательно, она неверна. Код ниже правильно передает данные из читаемого потока.
Загрузить ссылку
async function uploadReadableStream(stream) {
const params = {Bucket: bucket, Key: key, Body: stream};
return s3.upload(params).promise();
}
async function upload() {
const readable = getSomeReadableStream();
const results = await uploadReadableStream(readable);
console.log('upload complete', results);
}
Вы также можете пойти дальше и вывести информацию о прогрессе, используя ManagedUpload
как таковой:
const manager = s3.upload(params);
manager.on('httpUploadProgress', (progress) => {
console.log('progress', progress) // { loaded: 4915, total: 192915, part: 1, key: 'foo.jpg' }
});
Ссылка на ManagedUpload
Список доступных событий
Ответ 4
Тип сценария решения:
Этот пример использует:
import * as AWS from "aws-sdk";
import * as fsExtra from "fs-extra";
import * as zlib from "zlib";
import * as stream from "stream";
И асинхронная функция:
public async saveFile(filePath: string, s3Bucket: AWS.S3, key: string, bucketName: string): Promise<boolean> {
const uploadStream = (S3: AWS.S3, Bucket: string, Key: string) => {
const passT = new stream.PassThrough();
return {
writeStream: passT,
promise: S3.upload({ Bucket, Key, Body: passT }).promise(),
};
};
const { writeStream, promise } = uploadStream(s3Bucket, bucketName, key);
fsExtra.createReadStream(filePath).pipe(writeStream); // NOTE: Addition You can compress to zip by .pipe(zlib.createGzip()).pipe(writeStream)
let output = true;
await promise.catch((reason)=> { output = false; console.log(reason);});
return output;
}
Вызовите этот метод где-то вроде:
let result = await saveFileToS3(testFilePath, someS3Bucket, someKey, someBucketName);
Ответ 5
Для тех, кто жалуется, что, когда они используют функцию загрузки API s3 и файл нулевого байта, заканчивают на s3 (@Radar155 и @gabo) - у меня также была эта проблема.
Создайте второй поток PassThrough и просто передайте все данные от первого ко второму и передайте ссылку на эту секунду в s3. Вы можете сделать это несколькими различными способами - возможно, грязный способ - прослушать событие "data" в первом потоке и затем записать те же данные во второй поток - аналогично событию "end" - просто вызвать конечная функция во втором потоке. Я понятия не имею, является ли это ошибкой в API-интерфейсе aws, версией узла или какой-либо другой проблемой, но она обошла эту проблему для меня.
Вот как это может выглядеть:
var PassThroughStream = require('stream').PassThrough;
var srcStream = new PassThroughStream();
var rstream = fs.createReadStream('Learning/stocktest.json');
var sameStream = rstream.pipe(srcStream);
// interesting note: (srcStream == sameStream) at this point
var destStream = new PassThroughStream();
// call your s3.upload function here - passing in the destStream as the Body parameter
srcStream.on('data', function (chunk) {
destStream.write(chunk);
});
srcStream.on('end', function () {
dataStream.end();
});
Ответ 6
Если это помогает кому-то, с кем я смог успешно перевести с клиента на s3:
https://gist.github.com/mattlockyer/532291b6194f6d9ca40cb82564db9d2a
Серверный код предполагает, что req
является объектом потока, в моем случае он был отправлен от клиента с информацией о файле, установленной в заголовках.
const fileUploadStream = (req, res) => {
//get "body" args from header
const { id, fn } = JSON.parse(req.get('body'));
const Key = id + '/' + fn; //upload to s3 folder "id" with filename === fn
const params = {
Key,
Bucket: bucketName, //set somewhere
Body: req, //req is a stream
};
s3.upload(params, (err, data) => {
if (err) {
res.send('Error Uploading Data: ' + JSON.stringify(err) + '\n' + JSON.stringify(err.stack));
} else {
res.send(Key);
}
});
};
Да, это нарушает соглашение, но если вы посмотрите на суть, это намного чище, чем все, что я нашел, используя multer, busboy и т.д.
+1 для прагматизма и благодаря @SalehenRahman за помощь.
Ответ 7
Я использую KnexJS, и у меня возникли проблемы с использованием их потокового API. Я наконец исправил это, надеюсь, следующее поможет кому-то.
const knexStream = knex.select('*').from('my_table').stream();
const passThroughStream = new stream.PassThrough();
knexStream.on('data', (chunk) => passThroughStream.write(JSON.stringify(chunk) + '\n'));
knexStream.on('end', () => passThroughStream.end());
const uploadResult = await s3
.upload({
Bucket: 'my-bucket',
Key: 'stream-test.txt',
Body: passThroughStream
})
.promise();
Ответ 8
Ни один из ответов не помог мне, потому что я хотел:
- Pipe в
s3.upload()
- Передать результат
s3.upload()
в другой поток
Принятый ответ не делает последнего. Другие полагаются на обещание API, которое является трудоемким при работе с потоковыми каналами.
Это моя модификация принятого ответа.
const s3 = new S3();
function writeToS3({Key, Bucket}) {
const Body = new stream.PassThrough();
s3.upload({
Body,
Key,
Bucket: process.env.adpBucket
})
.on('httpUploadProgress', progress => {
console.log('progress', progress);
})
.send((err, data) => {
if (err) {
Body.destroy(err);
} else {
console.log('File uploaded and available at ${data.Location}');
Body.destroy();
}
});
return Body;
}
const pipeline = myReadableStream.pipe(writeToS3({Key, Bucket});
pipeline.on('close', () => {
// upload finished, do something else
})
pipeline.on('error', () => {
// upload wasn't successful. Handle it
})
Ответ 9
Вот что следует отметить в наиболее принятом ответе выше:
Вам нужно вернуть проход в функцию, если вы используете конвейер,
fs.createReadStream(<filePath>).pipe(anyUploadFunction())
function anyUploadFunction () {
let pass = new stream.PassThrough();
return pass // <- Returning this pass is important for the stream to understand where it needs to write to.
}
В противном случае он будет молча переходить к следующему, не выдавая ошибку, или выдаст ошибку TypeError: dest.on is not a function
в зависимости от того, как вы написали функцию
Ответ 10
Если вы знаете размер потока, вы можете использовать minio-js для загрузки потока следующим образом:
s3Client.putObject('my-bucketname', 'my-objectname.ogg', stream, size, 'audio/ogg', function(e) {
if (e) {
return console.log(e)
}
console.log("Successfully uploaded the stream")
})