Atualmente estou usando um plugin node.js chamado s3-upload-stream para transmitir arquivos muito grandes para o Amazon S3. Ele usa a API multipartes e na maior parte funciona muito bem.
No entanto, este módulo está mostrando sua idade e eu já tive que fazer modificações nele (o autor também o tornou obsoleto). Hoje me deparei com outro problema com a Amazon e realmente gostaria de seguir a recomendação do autor e começar a usar o aws-sdk oficial para realizar meus uploads.
MAS.
O SDK oficial não parece suportar piping para s3.upload()
. A natureza de s3.upload é que você deve passar o fluxo legível como um argumento para o construtor S3.
Tenho cerca de 120 módulos de código de usuário que fazem vários processamentos de arquivos e são independentes do destino final de sua saída. O mecanismo fornece a eles um fluxo de saída gravável canalizável e eles canalizam para ele. Não posso entregar a eles um AWS.S3
objeto e pedir que o chamem upload()
sem adicionar código a todos os módulos. A razão pela qual usei s3-upload-stream
foi porque ele suportava tubulação.
Existe uma maneira de fazer do aws-sdk s3.upload()
algo para o qual eu possa canalizar o stream?
Resposta um pouco tarde, pode ajudar alguém, esperançosamente. Você pode retornar o fluxo gravável e a promessa, para obter dados de resposta quando o upload terminar.
const AWS = require('aws-sdk'); const stream = require('stream'); const uploadStream = ({ Bucket, Key }) => { const s3 = new AWS.S3(); const pass = new stream.PassThrough(); return { writeStream: pass, promise: s3.upload({ Bucket, Key, Body: pass }).promise(), }; }
E você pode usar a função da seguinte maneira:
const { writeStream, promise } = uploadStream({Bucket: 'yourbucket', Key: 'yourfile.mp4'}); const readStream = fs.createReadStream('/path/to/yourfile.mp4'); const pipeline = readStream.pipe(writeStream);
Agora você pode verificar a promessa:
promise.then(() => { console.log('upload completed successfully'); }).catch((err) => { console.log('upload failed.', err.message); });
Ou como
stream.pipe()
retorna stream.Writable, o destino (variável writeStream acima), permitindo uma cadeia de tubos, também podemos usar seus eventos:pipeline.on('close', () => { console.log('upload successful'); }); pipeline.on('error', (err) => { console.log('upload failed', err.message) });
fonte
Na resposta aceita, a função termina antes que o upload seja concluído e, portanto, está incorreta. O código abaixo canaliza corretamente a partir de um fluxo legível.
Referência de upload
async function uploadReadableStream(stream) { const params = {Bucket: bucket, Key: key, Body: stream}; return s3.upload(params).promise(); } async function upload() { const readable = getSomeReadableStream(); const results = await uploadReadableStream(readable); console.log('upload complete', results); }
Você também pode dar um passo adiante e gerar informações de progresso usando
ManagedUpload
como tal:const manager = s3.upload(params); manager.on('httpUploadProgress', (progress) => { console.log('progress', progress) // { loaded: 4915, total: 192915, part: 1, key: 'foo.jpg' } });
Referência de ManagedUpload
Uma lista de eventos disponíveis
fonte
TypeError: dest.on is not a function
alguma ideia por quê?dest.on
? você pode mostrar um exemplo? @FireBrandNenhuma das respostas funcionou para mim porque eu queria:
s3.upload()
s3.upload()
em outro fluxoA resposta aceita não faz o último. Os outros contam com a API de promessa, que é difícil de trabalhar ao trabalhar com tubulações de fluxo.
Esta é a minha modificação da resposta aceita.
const s3 = new S3(); function writeToS3({Key, Bucket}) { const Body = new stream.PassThrough(); s3.upload({ Body, Key, Bucket: process.env.adpBucket }) .on('httpUploadProgress', progress => { console.log('progress', progress); }) .send((err, data) => { if (err) { Body.destroy(err); } else { console.log(`File uploaded and available at ${data.Location}`); Body.destroy(); } }); return Body; } const pipeline = myReadableStream.pipe(writeToS3({Key, Bucket}); pipeline.on('close', () => { // upload finished, do something else }) pipeline.on('error', () => { // upload wasn't successful. Handle it })
fonte
Solução de script de tipo:
Este exemplo usa:
import * as AWS from "aws-sdk"; import * as fsExtra from "fs-extra"; import * as zlib from "zlib"; import * as stream from "stream";
E função assíncrona:
public async saveFile(filePath: string, s3Bucket: AWS.S3, key: string, bucketName: string): Promise<boolean> { const uploadStream = (S3: AWS.S3, Bucket: string, Key: string) => { const passT = new stream.PassThrough(); return { writeStream: passT, promise: S3.upload({ Bucket, Key, Body: passT }).promise(), }; }; const { writeStream, promise } = uploadStream(s3Bucket, bucketName, key); fsExtra.createReadStream(filePath).pipe(writeStream); // NOTE: Addition You can compress to zip by .pipe(zlib.createGzip()).pipe(writeStream) let output = true; await promise.catch((reason)=> { output = false; console.log(reason);}); return output; }
Chame esse método em algum lugar como:
let result = await saveFileToS3(testFilePath, someS3Bucket, someKey, someBucketName);
fonte
O que se deve notar aqui na resposta mais aceita acima é que: Você precisa retornar o passe na função se estiver usando um tubo como,
fs.createReadStream(<filePath>).pipe(anyUploadFunction())
function anyUploadFunction () { let pass = new stream.PassThrough(); return pass // <- Returning this pass is important for the stream to understand where it needs to write to. }
Caso contrário, ele irá silenciosamente para a próxima sem gerar um erro ou irá lançar um erro
TypeError: dest.on is not a function
dependendo de como você escreveu a funçãofonte
Se ajudar alguém, consegui transmitir do cliente para o s3 com sucesso:
https://gist.github.com/mattlockyer/532291b6194f6d9ca40cb82564db9d2a
O código do lado do servidor assume que
req
é um objeto de fluxo, no meu caso, ele foi enviado do cliente com informações de arquivo definidas nos cabeçalhos.const fileUploadStream = (req, res) => { //get "body" args from header const { id, fn } = JSON.parse(req.get('body')); const Key = id + '/' + fn; //upload to s3 folder "id" with filename === fn const params = { Key, Bucket: bucketName, //set somewhere Body: req, //req is a stream }; s3.upload(params, (err, data) => { if (err) { res.send('Error Uploading Data: ' + JSON.stringify(err) + '\n' + JSON.stringify(err.stack)); } else { res.send(Key); } }); };
Sim, quebra as convenções, mas se você olhar a essência, é muito mais limpo do que qualquer outra coisa que eu encontrei usando multer, busboy etc ...
+1 pelo pragmatismo e obrigado a @SalehenRahman por sua ajuda.
fonte
Pra quem reclama que quando usa a função de upload da api s3 e um arquivo de zero byte acaba no s3 (@ Radar155 e @gabo) - eu também tive esse problema.
Crie um segundo fluxo PassThrough e apenas canalize todos os dados do primeiro para o segundo e passe a referência daquele segundo para s3. Você pode fazer isso de duas maneiras diferentes - possivelmente uma forma suja é ouvir o evento "dados" no primeiro fluxo e, em seguida, gravar os mesmos dados no segundo fluxo - da mesma forma para o evento "fim" - basta chamar a função final no segundo fluxo. Não tenho ideia se isso é um bug no aws api, a versão do nó ou algum outro problema - mas resolveu o problema para mim.
Pode ser assim:
var PassThroughStream = require('stream').PassThrough; var srcStream = new PassThroughStream(); var rstream = fs.createReadStream('Learning/stocktest.json'); var sameStream = rstream.pipe(srcStream); // interesting note: (srcStream == sameStream) at this point var destStream = new PassThroughStream(); // call your s3.upload function here - passing in the destStream as the Body parameter srcStream.on('data', function (chunk) { destStream.write(chunk); }); srcStream.on('end', function () { dataStream.end(); });
fonte
Seguindo as outras respostas e usando o SDK da AWS para Node.js mais recente, há uma solução muito mais limpa e simples, pois a função s3 upload () aceita um fluxo, usando a sintaxe de await e a promessa de S3:
var model = await s3Client.upload({ Bucket : bucket, Key : key, ContentType : yourContentType, Body : fs.createReadStream(path-to-file) }).promise();
fonte
Estou usando o KnexJS e tive um problema ao usar a API de streaming. Eu finalmente consertei, espero que o seguinte ajude alguém.
const knexStream = knex.select('*').from('my_table').stream(); const passThroughStream = new stream.PassThrough(); knexStream.on('data', (chunk) => passThroughStream.write(JSON.stringify(chunk) + '\n')); knexStream.on('end', () => passThroughStream.end()); const uploadResult = await s3 .upload({ Bucket: 'my-bucket', Key: 'stream-test.txt', Body: passThroughStream }) .promise();
fonte
Se você sabe o tamanho do stream, você pode usar minio-js para fazer o upload do stream assim:
s3Client.putObject('my-bucketname', 'my-objectname.ogg', stream, size, 'audio/ogg', function(e) { if (e) { return console.log(e) } console.log("Successfully uploaded the stream") })
fonte