Como leio o conteúdo de um fluxo Node.js em uma variável de string?

113

Estou hackeando um programa Node que usa smtp-protocolpara capturar emails SMTP e agir sobre os dados de email. A biblioteca fornece os dados de correio como um fluxo, e não sei como colocá-los em uma string.

No momento, estou escrevendo para stdout com stream.pipe(process.stdout, { end: false }), mas, como disse, preciso dos dados do fluxo em uma string, que posso usar assim que o fluxo terminar.

Como faço para coletar todos os dados de um fluxo Node.js em uma string?

Obrienmd
fonte
Você deve copiar o stream ou sinalizá-lo com (autoClose: false). É uma má prática poluir a memória.
19h

Respostas:

41

(Esta resposta é de anos atrás, quando era a melhor resposta. Agora há uma resposta melhor abaixo. Eu não acompanhei o node.js e não posso excluir esta resposta porque está marcada como "correto nesta pergunta ". Se você está pensando em clicar, o que você quer que eu faça?)

A chave é usar os eventos datae endde um fluxo legível . Ouça estes eventos:

stream.on('data', (chunk) => { ... });
stream.on('end', () => { ... });

Ao receber o dataevento, adicione o novo bloco de dados a um Buffer criado para coletar os dados.

Ao receber o endevento, converta o Buffer preenchido em uma string, se necessário. Em seguida, faça o que você precisa fazer com ele.

ControlAltDel
fonte
149
Algumas linhas de código que ilustram a resposta são preferíveis a apenas apontar um link para a API. Não discorde da resposta, apenas não acredite que seja completa o suficiente.
arcseldon
3
Com as versões mais recentes do node.js, isso é mais limpo: stackoverflow.com/a/35530615/271961
Simon A. Eugster
A resposta deve ser atualizada para não recomendar o uso de uma biblioteca Promises, mas usar Promises nativas.
Dan Dascalescu
@DanDascalescu eu concordo com você. O problema é que escrevi esta resposta há 7 anos e não acompanhei o node.js. Se você for outra pessoa e gostaria de atualizá-lo, seria ótimo. Ou eu poderia simplesmente excluí-lo, pois já parece haver uma resposta melhor. O que você recomendaria?
ControlAltDel
@ControlAltDel: Agradeço sua iniciativa de excluir uma resposta que não é mais a melhor. Gostaria que os outros tivessem disciplina semelhante .
Dan Dascalescu
129

Outra forma seria converter o fluxo em uma promessa (consulte o exemplo abaixo) e usar then(ou await) para atribuir o valor resolvido a uma variável.

function streamToString (stream) {
  const chunks = []
  return new Promise((resolve, reject) => {
    stream.on('data', chunk => chunks.push(chunk))
    stream.on('error', reject)
    stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8')))
  })
}

const result = await streamToString(stream)
Marlon Bernardes
fonte
Estou muito novo para córregos e promessas e eu estou recebendo este erro: SyntaxError: await is only valid in async function. O que estou fazendo de errado?
JohnK
Você deve chamar a função streamtostring em uma função assíncrona. Para evitar isso, você também pode fazerstreamToString(stream).then(function(response){//Do whatever you want with response});
Enclo Creations
23
Esta deve ser a melhor resposta. Parabéns por produzir a única solução que acerta tudo, com (1) armazenar os chunks como Buffers e apenas chamar .toString("utf8")no final, para evitar o problema de uma falha de decodificação se um chunk for dividido no meio de um caractere multibyte; (2) tratamento de erros real; (3) colocar o código em uma função, para que possa ser reutilizado, não copiado e colado; (4) usando Promises para que a função possa ser awaitativada; (5) pequeno código que não arrasta um milhão de dependências, ao contrário de certas bibliotecas npm; (6) Sintaxe ES6 e melhores práticas modernas.
MultiplyByZer0 de
Por que não mover a matriz de pedaços para a promessa?
Jenny O'Reilly
1
Depois de sugerir essencialmente o mesmo código usando a resposta principal atual como dica, percebi que o código acima pode falhar com Uncaught TypeError [ERR_INVALID_ARG_TYPE]: The "list[0]" argument must be an instance of Buffer or Uint8Array. Received type stringse o fluxo produzir stringpedaços em vez de Buffer. O uso chunks.push(Buffer.from(chunk))deve funcionar com os blocos stringe Buffer.
Andrei LED
67

Nenhuma das opções acima funcionou para mim. Eu precisava usar o objeto Buffer:

  const chunks = [];

  readStream.on("data", function (chunk) {
    chunks.push(chunk);
  });

  // Send the buffer or you can put it into a var
  readStream.on("end", function () {
    res.send(Buffer.concat(chunks));
  });
Ricky
fonte
7
esta é realmente a maneira mais limpa de fazer isso;)
Ivo
7
Funciona bem. Apenas uma observação: se você quiser um tipo de string adequado, precisará chamar .toString () no objeto Buffer resultante da chamada concat ()
Bryan Johnson
64

Espero que seja mais útil do que a resposta acima:

var string = '';
stream.on('data',function(data){
  string += data.toString();
  console.log('stream data ' + part);
});

stream.on('end',function(){
  console.log('final output ' + string);
});

Observe que a concatenação de string não é a maneira mais eficiente de coletar as partes da string, mas é usada para simplificar (e talvez seu código não se importe com a eficiência).

Além disso, esse código pode produzir falhas imprevisíveis para texto não ASCII (ele assume que cada caractere cabe em um byte), mas talvez você também não se importe com isso.

Tom Carchrae
fonte
4
Qual seria a forma mais eficiente de coletar partes de cordas? TY
sean2078
2
você poderia usar um buffer docs.nodejitsu.com/articles/advanced/buffers/how-to-use-buffers, mas realmente depende do seu uso.
Tom Carchrae
2
Use um array de strings onde você acrescenta cada novo pedaço ao array e chama join("")o array no final.
Valeriu Paloş,
14
Isso não está certo. Se o buffer estiver na metade de um ponto de código multibyte, o toString () receberá o utf-8 malformado e você terá um monte de em sua string.
alextgordon
2
@alextgordon está certo. Em alguns casos muito raros, quando eu tinha muitos pedaços, obtive-os no início e no final dos pedaços. Especialmente quando há símbolos russos nas bordas. Portanto, é correto concatenar pedaços e convertê-los no final, em vez de converter pedaços e concatená-los. No meu caso, a solicitação foi feita de um serviço para outro com request.js com codificação padrão
Mike Yermolayev
21

Normalmente, estou usando esta função simples para transformar um stream em uma string:

function streamToString(stream, cb) {
  const chunks = [];
  stream.on('data', (chunk) => {
    chunks.push(chunk.toString());
  });
  stream.on('end', () => {
    cb(chunks.join(''));
  });
}

Exemplo de uso:

let stream = fs.createReadStream('./myFile.foo');
streamToString(stream, (data) => {
  console.log(data);  // data is now my string variable
});
impulso de sonho
fonte
1
Resposta útil, mas parece que cada pedaço deve ser convertido em uma string antes de ser colocado na matriz:chunks.push(chunk.toString());
Nicolas Le Thierry d'Ennequin
1
Este é o único que funcionou para mim!
Muito
1
Essa foi uma ótima resposta!
Aft3rL1f3
12

E ainda outro para strings usando promessas:

function getStream(stream) {
  return new Promise(resolve => {
    const chunks = [];

    # Buffer.from is required if chunk is a String, see comments
    stream.on("data", chunk => chunks.push(Buffer.from(chunk)));
    stream.on("end", () => resolve(Buffer.concat(chunks).toString()));
  });
}

Uso:

const stream = fs.createReadStream(__filename);
getStream(stream).then(r=>console.log(r));

remova o .toString()para usar com dados binários, se necessário.

update : @AndreiLED apontou corretamente que há problemas com strings. Não consegui obter um fluxo que retornasse strings com a versão do nó que possuo, mas a API observa que isso é possível.

Estani
fonte
Percebi que o código acima pode falhar com Uncaught TypeError [ERR_INVALID_ARG_TYPE]: The "list[0]" argument must be an instance of Buffer or Uint8Array. Received type stringse o fluxo produzir stringpedaços em vez de Buffer. O uso chunks.push(Buffer.from(chunk))deve funcionar com os blocos stringe Buffer.
Andrei LED
bom ponto, eu atualizei a resposta. Obrigado.
Estani
8

A partir da documentação do nodejs , você deve fazer isso - lembre-se sempre de uma string sem saber que a codificação é apenas um monte de bytes:

var readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', function(chunk) {
  assert.equal(typeof chunk, 'string');
  console.log('got %d characters of string data', chunk.length);
})
Sebastian J.
fonte
6

Streams não tem uma .toString()função simples (que eu entendo) nem algo como uma .toStringAsync(cb)função (que eu não entendo).

Então, criei minha própria função auxiliar:

var streamToString = function(stream, callback) {
  var str = '';
  stream.on('data', function(chunk) {
    str += chunk;
  });
  stream.on('end', function() {
    callback(str);
  });
}

// how to use:
streamToString(myStream, function(myStr) {
  console.log(myStr);
});
flori
fonte
4

Tive mais sorte usando assim:

let string = '';
readstream
    .on('data', (buf) => string += buf.toString())
    .on('end', () => console.log(string));

Eu uso o nó v9.11.1e readstreamé a resposta de um http.getretorno de chamada.

vdegenne
fonte
3

A solução mais limpa pode ser usar o pacote "string-stream", que converte um fluxo em uma string com uma promessa.

const streamString = require('stream-string')

streamString(myStream).then(string_variable => {
    // myStream was converted to a string, and that string is stored in string_variable
    console.log(string_variable)

}).catch(err => {
     // myStream emitted an error event (err), so the promise from stream-string was rejected
    throw err
})
Steve Breese
fonte
3

Maneira fácil com a biblioteca get-stream popular (mais de 5 milhões de downloads semanais) e leve :

https://www.npmjs.com/package/get-stream

const fs = require('fs');
const getStream = require('get-stream');

(async () => {
    const stream = fs.createReadStream('unicorn.txt');
    console.log(await getStream(stream)); //output is string
})();
Ville
fonte
2

Que tal algo como um redutor de fluxo?

Aqui está um exemplo usando classes ES6 como usar um.

var stream = require('stream')

class StreamReducer extends stream.Writable {
  constructor(chunkReducer, initialvalue, cb) {
    super();
    this.reducer = chunkReducer;
    this.accumulator = initialvalue;
    this.cb = cb;
  }
  _write(chunk, enc, next) {
    this.accumulator = this.reducer(this.accumulator, chunk);
    next();
  }
  end() {
    this.cb(null, this.accumulator)
  }
}

// just a test stream
class EmitterStream extends stream.Readable {
  constructor(chunks) {
    super();
    this.chunks = chunks;
  }
  _read() {
    this.chunks.forEach(function (chunk) { 
        this.push(chunk);
    }.bind(this));
    this.push(null);
  }
}

// just transform the strings into buffer as we would get from fs stream or http request stream
(new EmitterStream(
  ["hello ", "world !"]
  .map(function(str) {
     return Buffer.from(str, 'utf8');
  })
)).pipe(new StreamReducer(
  function (acc, v) {
    acc.push(v);
    return acc;
  },
  [],
  function(err, chunks) {
    console.log(Buffer.concat(chunks).toString('utf8'));
  })
);
Fred
fonte
1

Isso funcionou para mim e é baseado nos documentos do Node v6.7.0 :

let output = '';
stream.on('readable', function() {
    let read = stream.read();
    if (read !== null) {
        // New stream data is available
        output += read.toString();
    } else {
        // Stream is now finished when read is null.
        // You can callback here e.g.:
        callback(null, output);
    }
});

stream.on('error', function(err) {
  callback(err, null);
})
anthonygore
fonte
1

setEncoding ('utf8');

Muito bem, Sebastian J acima.

Eu tive o "problema de buffer" com algumas linhas de código de teste que eu tinha, e adicionei as informações de codificação e isso resolveu, veja abaixo.

Demonstre o problema

Programas

// process.stdin.setEncoding('utf8');
process.stdin.on('data', (data) => {
    console.log(typeof(data), data);
});

entrada

hello world

resultado

object <Buffer 68 65 6c 6c 6f 20 77 6f 72 6c 64 0d 0a>

Demonstre a solução

Programas

process.stdin.setEncoding('utf8'); // <- Activate!
process.stdin.on('data', (data) => {
    console.log(typeof(data), data);
});

entrada

hello world

resultado

string hello world
Ivan
fonte
1

Todas as respostas listadas parecem abrir o fluxo legível no modo de fluxo, que não é o padrão no NodeJS e pode ter limitações, pois não tem suporte de contrapressão que o NodeJS fornece no modo de fluxo legível pausado. Aqui está uma implementação usando Just Buffers, Native Stream e Native Stream Transforms e suporte para o modo de objeto

import {Transform} from 'stream';

let buffer =null;    

function objectifyStream() {
    return new Transform({
        objectMode: true,
        transform: function(chunk, encoding, next) {

            if (!buffer) {
                buffer = Buffer.from([...chunk]);
            } else {
                buffer = Buffer.from([...buffer, ...chunk]);
            }
            next(null, buffer);
        }
    });
}

process.stdin.pipe(objectifyStream()).process.stdout
Herlarby
fonte
1

O que você pensa sobre isso ?

// lets a ReadableStream under stream variable 
const chunks = [];

for await (let chunk of stream) {
    chunks.push(chunk)
}

const buffer  = Buffer.concat(chunks);
const str = buffer.toString("utf-8")
Traycho Ivanov
fonte
Funciona muito limpo, sem dependências, legal!
ViRuSTriNiTy
0

Usando o pacote bastante popularstream-buffers que você provavelmente já tem nas dependências do seu projeto, isso é bastante simples:

// imports
const { WritableStreamBuffer } = require('stream-buffers');
const { promisify } = require('util');
const { createReadStream } = require('fs');
const pipeline = promisify(require('stream').pipeline);

// sample stream
let stream = createReadStream('/etc/hosts');

// pipeline the stream into a buffer, and print the contents when done
let buf = new WritableStreamBuffer();
pipeline(stream, buf).then(() => console.log(buf.getContents().toString()));
andrewdotn
fonte
0

No meu caso, os cabeçalhos de resposta do tipo de conteúdo eram Content-Type: text / plain . Então, eu li os dados do Buffer como:

let data = [];
stream.on('data', (chunk) => {
 console.log(Buffer.from(chunk).toString())
 data.push(Buffer.from(chunk).toString())
});
Dionis Oros
fonte