Canalize um stream para s3.upload ()

95

Atualmente estou usando um plugin node.js chamado s3-upload-stream para transmitir arquivos muito grandes para o Amazon S3. Ele usa a API multipartes e na maior parte funciona muito bem.

No entanto, este módulo está mostrando sua idade e eu já tive que fazer modificações nele (o autor também o tornou obsoleto). Hoje me deparei com outro problema com a Amazon e realmente gostaria de seguir a recomendação do autor e começar a usar o aws-sdk oficial para realizar meus uploads.

MAS.

O SDK oficial não parece suportar piping para s3.upload() . A natureza de s3.upload é que você deve passar o fluxo legível como um argumento para o construtor S3.

Tenho cerca de 120 módulos de código de usuário que fazem vários processamentos de arquivos e são independentes do destino final de sua saída. O mecanismo fornece a eles um fluxo de saída gravável canalizável e eles canalizam para ele. Não posso entregar a eles um AWS.S3objeto e pedir que o chamem upload()sem adicionar código a todos os módulos. A razão pela qual usei s3-upload-streamfoi porque ele suportava tubulação.

Existe uma maneira de fazer do aws-sdk s3.upload()algo para o qual eu possa canalizar o stream?

womp
fonte

Respostas:

137

Envolva a upload()função S3 com o stream.PassThrough()fluxo node.js.

Aqui está um exemplo:

inputStream
  .pipe(uploadFromStream(s3));

function uploadFromStream(s3) {
  var pass = new stream.PassThrough();

  var params = {Bucket: BUCKET, Key: KEY, Body: pass};
  s3.upload(params, function(err, data) {
    console.log(err, data);
  });

  return pass;
}
Casey Benko
fonte
2
Ótimo, isso resolveu meu hack muito feio = -) Você pode explicar o que o stream.PassThrough () realmente faz?
mraxus
6
O fluxo do PassThrough fecha quando você faz isso? Estou tendo um inferno de propagar o fechamento em s3.upload para atingir meu fluxo PassThrough.
four43 de
7
o tamanho do arquivo enviado é 0 byte. Se eu canalizar os mesmos dados do fluxo de origem para o sistema de arquivos, tudo funcionará bem. Qualquer ideia?
Radar155
3
Um fluxo de passagem pegará bytes gravados nele e os produzirá. Isso permite que você retorne um fluxo gravável que o aws-sdk irá ler enquanto você escreve nele. Eu também retornaria o objeto de resposta de s3.upload () porque, caso contrário, você não pode garantir que o upload seja concluído.
reconbot em
1
Não é o mesmo que passar o fluxo legível para o Corpo, mas com mais código? O SDK da AWS ainda chamará read () no fluxo PassThrough, portanto, não há um encanamento verdadeiro até o S3. A única diferença é que há um fluxo extra no meio.
ShadowChaser
96

Resposta um pouco tarde, pode ajudar alguém, esperançosamente. Você pode retornar o fluxo gravável e a promessa, para obter dados de resposta quando o upload terminar.

const AWS = require('aws-sdk');
const stream = require('stream');

const uploadStream = ({ Bucket, Key }) => {
  const s3 = new AWS.S3();
  const pass = new stream.PassThrough();
  return {
    writeStream: pass,
    promise: s3.upload({ Bucket, Key, Body: pass }).promise(),
  };
}

E você pode usar a função da seguinte maneira:

const { writeStream, promise } = uploadStream({Bucket: 'yourbucket', Key: 'yourfile.mp4'});
const readStream = fs.createReadStream('/path/to/yourfile.mp4');

const pipeline = readStream.pipe(writeStream);

Agora você pode verificar a promessa:

promise.then(() => {
  console.log('upload completed successfully');
}).catch((err) => {
  console.log('upload failed.', err.message);
});

Ou como stream.pipe()retorna stream.Writable, o destino (variável writeStream acima), permitindo uma cadeia de tubos, também podemos usar seus eventos:

 pipeline.on('close', () => {
   console.log('upload successful');
 });
 pipeline.on('error', (err) => {
   console.log('upload failed', err.message)
 });
Ahmet Cetin
fonte
Parece ótimo, mas do meu lado estou recebendo este erro stackoverflow.com/questions/62330721/…
Arco Voltaico
apenas respondeu à sua pergunta. espero que ajude.
Ahmet Cetin
49

Na resposta aceita, a função termina antes que o upload seja concluído e, portanto, está incorreta. O código abaixo canaliza corretamente a partir de um fluxo legível.

Referência de upload

async function uploadReadableStream(stream) {
  const params = {Bucket: bucket, Key: key, Body: stream};
  return s3.upload(params).promise();
}

async function upload() {
  const readable = getSomeReadableStream();
  const results = await uploadReadableStream(readable);
  console.log('upload complete', results);
}

Você também pode dar um passo adiante e gerar informações de progresso usando ManagedUploadcomo tal:

const manager = s3.upload(params);
manager.on('httpUploadProgress', (progress) => {
  console.log('progress', progress) // { loaded: 4915, total: 192915, part: 1, key: 'foo.jpg' }
});

Referência de ManagedUpload

Uma lista de eventos disponíveis

tsuz
fonte
1
aws-sdk agora oferece promessas integradas ao 2.3.0+, então você não precisa mais levantá-las. s3.upload (params) .promise (). then (data => data) .catch (error => error);
DBrown
1
@DBrown Obrigado pelo ponteiro! Eu atualizei a resposta, de acordo.
tsuz
1
@tsuz, tentando implementar sua solução me deu um erro: TypeError: dest.on is not a functionalguma ideia por quê?
FireBrand
O que é dest.on? você pode mostrar um exemplo? @FireBrand
tsuz
9
Isso diz que a resposta aceita está incompleta, mas não funciona com piping para s3.upload, conforme indicado na postagem atualizada de @Womp. Seria muito útil se essa resposta fosse atualizada para obter a saída canalizada de outra coisa!
MattW
6

Nenhuma das respostas funcionou para mim porque eu queria:

  • Pipe em s3.upload()
  • Canalize o resultado de s3.upload()em outro fluxo

A resposta aceita não faz o último. Os outros contam com a API de promessa, que é difícil de trabalhar ao trabalhar com tubulações de fluxo.

Esta é a minha modificação da resposta aceita.

const s3 = new S3();

function writeToS3({Key, Bucket}) {
  const Body = new stream.PassThrough();

  s3.upload({
    Body,
    Key,
    Bucket: process.env.adpBucket
  })
   .on('httpUploadProgress', progress => {
       console.log('progress', progress);
   })
   .send((err, data) => {
     if (err) {
       Body.destroy(err);
     } else {
       console.log(`File uploaded and available at ${data.Location}`);
       Body.destroy();
     }
  });

  return Body;
}

const pipeline = myReadableStream.pipe(writeToS3({Key, Bucket});

pipeline.on('close', () => {
  // upload finished, do something else
})
pipeline.on('error', () => {
  // upload wasn't successful. Handle it
})

cortopia
fonte
Parece ótimo, mas do meu lado estou recebendo este erro stackoverflow.com/questions/62330721/…
Arco Voltaico
5

Solução de script de tipo:
Este exemplo usa:

import * as AWS from "aws-sdk";
import * as fsExtra from "fs-extra";
import * as zlib from "zlib";
import * as stream from "stream";

E função assíncrona:

public async saveFile(filePath: string, s3Bucket: AWS.S3, key: string, bucketName: string): Promise<boolean> { 

         const uploadStream = (S3: AWS.S3, Bucket: string, Key: string) => {
            const passT = new stream.PassThrough();
            return {
              writeStream: passT,
              promise: S3.upload({ Bucket, Key, Body: passT }).promise(),
            };
          };
        const { writeStream, promise } = uploadStream(s3Bucket, bucketName, key);
        fsExtra.createReadStream(filePath).pipe(writeStream);     //  NOTE: Addition You can compress to zip by  .pipe(zlib.createGzip()).pipe(writeStream)
        let output = true;
        await promise.catch((reason)=> { output = false; console.log(reason);});
        return output;
}

Chame esse método em algum lugar como:

let result = await saveFileToS3(testFilePath, someS3Bucket, someKey, someBucketName);
dzole vladimirov
fonte
4

O que se deve notar aqui na resposta mais aceita acima é que: Você precisa retornar o passe na função se estiver usando um tubo como,

fs.createReadStream(<filePath>).pipe(anyUploadFunction())

function anyUploadFunction () { 
 let pass = new stream.PassThrough();
 return pass // <- Returning this pass is important for the stream to understand where it needs to write to.
}

Caso contrário, ele irá silenciosamente para a próxima sem gerar um erro ou irá lançar um erro TypeError: dest.on is not a functiondependendo de como você escreveu a função

Varun Bhaya
fonte
3

Se ajudar alguém, consegui transmitir do cliente para o s3 com sucesso:

https://gist.github.com/mattlockyer/532291b6194f6d9ca40cb82564db9d2a

O código do lado do servidor assume que reqé um objeto de fluxo, no meu caso, ele foi enviado do cliente com informações de arquivo definidas nos cabeçalhos.

const fileUploadStream = (req, res) => {
  //get "body" args from header
  const { id, fn } = JSON.parse(req.get('body'));
  const Key = id + '/' + fn; //upload to s3 folder "id" with filename === fn
  const params = {
    Key,
    Bucket: bucketName, //set somewhere
    Body: req, //req is a stream
  };
  s3.upload(params, (err, data) => {
    if (err) {
      res.send('Error Uploading Data: ' + JSON.stringify(err) + '\n' + JSON.stringify(err.stack));
    } else {
      res.send(Key);
    }
  });
};

Sim, quebra as convenções, mas se você olhar a essência, é muito mais limpo do que qualquer outra coisa que eu encontrei usando multer, busboy etc ...

+1 pelo pragmatismo e obrigado a @SalehenRahman por sua ajuda.

Mattdlockyer
fonte
multer, busboy lida com uploads multipart / form-data. req as a stream funciona quando o cliente envia um buffer como corpo de XMLHttpRequest.
André Werlang
Pra esclarecer, o upload está sendo realizado pelo back end não é pelo cliente certo?
numX
Sim, está "canalizando" o fluxo, NO backend, mas veio de um frontend
mattdlockyer
3

Pra quem reclama que quando usa a função de upload da api s3 e um arquivo de zero byte acaba no s3 (@ Radar155 e @gabo) - eu também tive esse problema.

Crie um segundo fluxo PassThrough e apenas canalize todos os dados do primeiro para o segundo e passe a referência daquele segundo para s3. Você pode fazer isso de duas maneiras diferentes - possivelmente uma forma suja é ouvir o evento "dados" no primeiro fluxo e, em seguida, gravar os mesmos dados no segundo fluxo - da mesma forma para o evento "fim" - basta chamar a função final no segundo fluxo. Não tenho ideia se isso é um bug no aws api, a versão do nó ou algum outro problema - mas resolveu o problema para mim.

Pode ser assim:

var PassThroughStream = require('stream').PassThrough;
var srcStream = new PassThroughStream();

var rstream = fs.createReadStream('Learning/stocktest.json');
var sameStream = rstream.pipe(srcStream);
// interesting note: (srcStream == sameStream) at this point
var destStream = new PassThroughStream();
// call your s3.upload function here - passing in the destStream as the Body parameter
srcStream.on('data', function (chunk) {
    destStream.write(chunk);
});

srcStream.on('end', function () {
    dataStream.end();
});
Tim
fonte
Isso realmente funcionou para mim também. A função de upload do S3 simplesmente "morria" silenciosamente sempre que um upload de várias partes era usado, mas ao usar sua solução funcionou bem (!). Obrigado! :)
jhdrn
Você pode dar algumas informações sobre por que o segundo fluxo é necessário?
noob7
2

Seguindo as outras respostas e usando o SDK da AWS para Node.js mais recente, há uma solução muito mais limpa e simples, pois a função s3 upload () aceita um fluxo, usando a sintaxe de await e a promessa de S3:

var model = await s3Client.upload({
    Bucket : bucket,
    Key : key,
    ContentType : yourContentType,
    Body : fs.createReadStream(path-to-file)
}).promise();
Emich
fonte
Isso funciona para o caso de uso específico de "ler um arquivo muito grande" que o autor mencionou, mas as outras respostas ainda são válidas se você estiver usando fluxos fora do contexto de um arquivo (por exemplo, tentando escrever um fluxo de cursor mongo para s3 onde você ainda precisa usar um fluxo PassThrough + pipe)
Ken Colton
0

Estou usando o KnexJS e tive um problema ao usar a API de streaming. Eu finalmente consertei, espero que o seguinte ajude alguém.

const knexStream = knex.select('*').from('my_table').stream();
const passThroughStream = new stream.PassThrough();

knexStream.on('data', (chunk) => passThroughStream.write(JSON.stringify(chunk) + '\n'));
knexStream.on('end', () => passThroughStream.end());

const uploadResult = await s3
  .upload({
    Bucket: 'my-bucket',
    Key: 'stream-test.txt',
    Body: passThroughStream
  })
  .promise();
TestWell
fonte
-3

Se você sabe o tamanho do stream, você pode usar minio-js para fazer o upload do stream assim:

  s3Client.putObject('my-bucketname', 'my-objectname.ogg', stream, size, 'audio/ogg', function(e) {
    if (e) {
      return console.log(e)
    }
    console.log("Successfully uploaded the stream")
  })
Krishna Srinivas
fonte