Analisando arquivos de log enormes no Node.js - leia linha por linha

125

Eu preciso fazer uma análise de arquivos de log grandes (5-10 Gb) em Javascript / Node.js (estou usando o Cube).

A linha de log se parece com:

10:00:43.343423 I'm a friendly log message. There are 5 cats, and 7 dogs. We are in state "SUCCESS".

Precisamos ler cada linha, fazer uma análise (por exemplo, retirar 5, 7e SUCCESS) e, em seguida bombear esses dados em Cube ( https://github.com/square/cube ) usando seu cliente JS.

Primeiramente, qual é a maneira canônica no Node de ler um arquivo, linha por linha?

Parece ser uma pergunta bastante comum online:

Muitas das respostas parecem apontar para vários módulos de terceiros:

No entanto, isso parece uma tarefa bastante básica - certamente, há uma maneira simples no stdlib de ler um arquivo de texto, linha por linha?

Em segundo lugar, preciso processar cada linha (por exemplo, converter o registro de data e hora em um objeto Date e extrair campos úteis).

Qual é a melhor maneira de fazer isso, maximizando a taxa de transferência? Existe alguma maneira de não bloquear a leitura em cada linha ou enviá-la para o Cube?

Em terceiro lugar - acho que usando divisões de string, e o equivalente JS de contains (IndexOf! = -1?) Será muito mais rápido que as expressões regulares? Alguém já teve muita experiência na análise de grandes quantidades de dados de texto no Node.js.

Cheers, Victor

victorhooi
fonte
Eu construí um analisador de log no nó que leva um monte de seqüências de caracteres regex com 'capturas' incorporadas e saídas para JSON. Você pode até chamar funções em cada captura, se quiser fazer um cálculo. Ele pode fazer o que você deseja: npmjs.org/package/logax
Jess

Respostas:

208

Procurei uma solução para analisar arquivos muito grandes (gbs), linha por linha, usando um fluxo. Todas as bibliotecas e exemplos de terceiros não atenderam às minhas necessidades, pois processaram os arquivos não linha por linha (como 1, 2, 3, 4 ..) ou leram o arquivo inteiro na memória

A solução a seguir pode analisar arquivos muito grandes, linha por linha, usando stream & pipe. Para testar, usei um arquivo de 2,1 gb com 17.000.000 registros. O uso de RAM não excedeu 60 mb.

Primeiro, instale o pacote de fluxo de eventos :

npm install event-stream

Então:

var fs = require('fs')
    , es = require('event-stream');

var lineNr = 0;

var s = fs.createReadStream('very-large-file.csv')
    .pipe(es.split())
    .pipe(es.mapSync(function(line){

        // pause the readstream
        s.pause();

        lineNr += 1;

        // process line here and call s.resume() when rdy
        // function below was for logging memory usage
        logMemoryUsage(lineNr);

        // resume the readstream, possibly from a callback
        s.resume();
    })
    .on('error', function(err){
        console.log('Error while reading file.', err);
    })
    .on('end', function(){
        console.log('Read entire file.')
    })
);

insira a descrição da imagem aqui

Por favor, deixe-me saber como vai!

Gerard
fonte
6
Para sua informação, este código não é síncrono. É assíncrono. Se você inserir console.log(lineNr)após a última linha do seu código, ele não mostrará a contagem final de linhas porque o arquivo é lido de forma assíncrona.
jfriend00
4
Obrigado, essa foi a única solução que pude encontrar que realmente parou e foi retomada quando deveria. Readline não.
Brent
3
Exemplo impressionante, e realmente faz uma pausa. Além disso, se você decidir interromper a leitura do arquivo mais cedo, poderá usars.end();
zipzit 23/02
2
Funcionou como um encanto. Utilizou-o para indexar 150 milhões de documentos para o índice elasticsearch. readlinemódulo é uma dor. Ele não pausa e estava causando falhas todas as vezes após 40-50 milhões. Desperdiçou um dia. Muito obrigado pela resposta. Este funciona perfeitamente
Mandeep Singh
3
o fluxo de eventos foi comprometido: medium.com/intrinsic/…, mas 4+ é aparentemente seguro blog.npmjs.org/post/180565383195/…
John Vandivier 02/09/19
72

Você pode usar o readlinepacote embutido , consulte a documentação aqui . Eu uso o fluxo para criar um novo fluxo de saída.

var fs = require('fs'),
    readline = require('readline'),
    stream = require('stream');

var instream = fs.createReadStream('/path/to/file');
var outstream = new stream;
outstream.readable = true;
outstream.writable = true;

var rl = readline.createInterface({
    input: instream,
    output: outstream,
    terminal: false
});

rl.on('line', function(line) {
    console.log(line);
    //Do your stuff ...
    //Then write to outstream
    rl.write(cubestuff);
});

Arquivos grandes levarão algum tempo para serem processados. Diga se funciona.

user568109
fonte
2
Conforme gravado, a penúltima linha falha porque o cubestuff não está definido.
18714 Greg Greg
2
Usando readline, é possível pausar / retomar o fluxo de leitura para executar ações assíncronas na área "fazer coisas"?
jchook
1
@jchook readlineestava me dando muitos problemas quando tentei pausar / retomar. Ele não interromper o fluxo de criar corretamente um monte de problema se o processo a jusante é mais lento
Mandeep Singh
31

Eu realmente gostei da resposta @gerard, que realmente merece ser a resposta correta aqui. Fiz algumas melhorias:

  • O código está em uma classe (modular)
  • A análise está incluída
  • A capacidade de retomar é fornecida para o exterior, caso haja um trabalho assíncrono acorrentado à leitura do CSV, como inserção no DB ou uma solicitação HTTP
  • Leitura em tamanhos de partes / batche que o usuário pode declarar. Também cuidei da codificação no fluxo, caso você tenha arquivos com codificação diferente.

Aqui está o código:

'use strict'

const fs = require('fs'),
    util = require('util'),
    stream = require('stream'),
    es = require('event-stream'),
    parse = require("csv-parse"),
    iconv = require('iconv-lite');

class CSVReader {
  constructor(filename, batchSize, columns) {
    this.reader = fs.createReadStream(filename).pipe(iconv.decodeStream('utf8'))
    this.batchSize = batchSize || 1000
    this.lineNumber = 0
    this.data = []
    this.parseOptions = {delimiter: '\t', columns: true, escape: '/', relax: true}
  }

  read(callback) {
    this.reader
      .pipe(es.split())
      .pipe(es.mapSync(line => {
        ++this.lineNumber

        parse(line, this.parseOptions, (err, d) => {
          this.data.push(d[0])
        })

        if (this.lineNumber % this.batchSize === 0) {
          callback(this.data)
        }
      })
      .on('error', function(){
          console.log('Error while reading file.')
      })
      .on('end', function(){
          console.log('Read entirefile.')
      }))
  }

  continue () {
    this.data = []
    this.reader.resume()
  }
}

module.exports = CSVReader

Então, basicamente, aqui está como você o usará:

let reader = CSVReader('path_to_file.csv')
reader.read(() => reader.continue())

Eu testei isso com um arquivo CSV de 35GB e funcionou para mim, e foi por isso que escolhi construí-lo com base na resposta da @gerard . Os feedbacks são bem-vindos.

ambodi
fonte
quanto tempo levou?
Z. Khullah 19/06/19
Aparentemente, isso não tem pause()ligação, não é?
Vanuan 02/11/19
Além disso, isso não chama a função de retorno de chamada final. Portanto, se batchSize for 100, o tamanho dos arquivos for 150, apenas 100 itens serão processados. Estou errado?
Vanuan
16

Usei https://www.npmjs.com/package/line-by-line para ler mais de 1 000 000 linhas de um arquivo de texto. Nesse caso, uma capacidade ocupada de RAM era de cerca de 50 a 60 megabytes.

    const LineByLineReader = require('line-by-line'),
    lr = new LineByLineReader('big_file.txt');

    lr.on('error', function (err) {
         // 'err' contains error object
    });

    lr.on('line', function (line) {
        // pause emitting of lines...
        lr.pause();

        // ...do your asynchronous line processing..
        setTimeout(function () {
            // ...and continue emitting lines.
            lr.resume();
        }, 100);
    });

    lr.on('end', function () {
         // All lines are read, file is closed now.
    });
Eugene Ilyushin
fonte
'linha por linha' é mais eficiente em termos de memória que a resposta selecionada. Para 1 milhão de linhas em um csv, a resposta selecionada teve meu processo de nó nos baixos 800s de megabytes. Usando 'linha por linha', foi consistentemente nos 700s baixos. Este módulo também mantém o código limpo e fácil de ler. No total, precisarei ler cerca de 18 milhões para que cada mb conte!
Neo
é uma pena que isso use seu próprio evento 'line' em vez do 'chunk' padrão, o que significa que você não poderá usar o 'pipe'.
Rene Wooller
Após horas de teste e pesquisa, essa é a única solução que realmente pára no lr.cancel()método. Lê as primeiras 1000 linhas de um arquivo 5Gig em 1ms. Impressionante!!!!
Perez Lamed van Niekerk
6

Além de ler o grande arquivo linha por linha, você também pode lê-lo pedaço por pedaço. Para mais informações, consulte este artigo

var offset = 0;
var chunkSize = 2048;
var chunkBuffer = new Buffer(chunkSize);
var fp = fs.openSync('filepath', 'r');
var bytesRead = 0;
while(bytesRead = fs.readSync(fp, chunkBuffer, 0, chunkSize, offset)) {
    offset += bytesRead;
    var str = chunkBuffer.slice(0, bytesRead).toString();
    var arr = str.split('\n');

    if(bytesRead = chunkSize) {
        // the last item of the arr may be not a full line, leave it to the next chunk
        offset -= arr.pop().length;
    }
    lines.push(arr);
}
console.log(lines);
Kris Roofe
fonte
Será que o seguinte deve ser uma comparação em vez de uma atribuição if(bytesRead = chunkSize):?
Stefan Rein
4

A documentação do Node.js. oferece um exemplo muito elegante usando o módulo Readline.

Exemplo: Ler fluxo de arquivos linha por linha

const fs = require('fs');
const readline = require('readline');

const rl = readline.createInterface({
    input: fs.createReadStream('sample.txt'),
    crlfDelay: Infinity
});

rl.on('line', (line) => {
    console.log(`Line from file: ${line}`);
});

Nota: usamos a opção crlfDelay para reconhecer todas as instâncias de CR LF ('\ r \ n') como uma única quebra de linha.

Jaime Gómez
fonte
3

Eu ainda tinha o mesmo problema. Depois de comparar vários módulos que parecem ter esse recurso, eu decidi fazer isso sozinho, é mais simples do que eu pensava.

gist: https://gist.github.com/deemstone/8279565

var fetchBlock = lineByline(filepath, onEnd);
fetchBlock(function(lines, start){ ... });  //lines{array} start{int} lines[0] No.

Ele cobre o arquivo aberto em um fechamento, que fetchBlock()retornou buscará um bloco do arquivo, terminará dividido em matriz (tratará o segmento da última busca).

Eu configurei o tamanho do bloco para 1024 para cada operação de leitura. Isso pode ter erros, mas a lógica do código é óbvia, tente você mesmo.

deemstone
fonte
2

node-byline usa fluxos, então eu preferiria aquele para seus arquivos enormes.

para suas conversões de data, eu usaria o moment.js .

para maximizar seu rendimento, você pode pensar em usar um cluster de software. existem alguns módulos agradáveis ​​que envolvem muito bem o módulo de cluster nativo do nó. eu gosto do cluster-master do isaacs. por exemplo, você pode criar um cluster de x workers que calculam todos os arquivos.

para comparações de divisões versus regexes, use benchmark.js . Eu não testei até agora. benchmark.js está disponível como um módulo de nó

hereandnow78
fonte
2

Com base nessas perguntas, respondi que implementei uma classe que você pode usar para ler um arquivo de forma síncrona, linha por linha fs.readSync(). Você pode fazer essa "pausa" e "retomar" usando uma Qpromessa ( jQueryparece exigir um DOM, portanto não é possível executá-lo nodejs):

var fs = require('fs');
var Q = require('q');

var lr = new LineReader(filenameToLoad);
lr.open();

var promise;
workOnLine = function () {
    var line = lr.readNextLine();
    promise = complexLineTransformation(line).then(
        function() {console.log('ok');workOnLine();},
        function() {console.log('error');}
    );
}
workOnLine();

complexLineTransformation = function (line) {
    var deferred = Q.defer();
    // ... async call goes here, in callback: deferred.resolve('done ok'); or deferred.reject(new Error(error));
    return deferred.promise;
}

function LineReader (filename) {      
  this.moreLinesAvailable = true;
  this.fd = undefined;
  this.bufferSize = 1024*1024;
  this.buffer = new Buffer(this.bufferSize);
  this.leftOver = '';

  this.read = undefined;
  this.idxStart = undefined;
  this.idx = undefined;

  this.lineNumber = 0;

  this._bundleOfLines = [];

  this.open = function() {
    this.fd = fs.openSync(filename, 'r');
  };

  this.readNextLine = function () {
    if (this._bundleOfLines.length === 0) {
      this._readNextBundleOfLines();
    }
    this.lineNumber++;
    var lineToReturn = this._bundleOfLines[0];
    this._bundleOfLines.splice(0, 1); // remove first element (pos, howmany)
    return lineToReturn;
  };

  this.getLineNumber = function() {
    return this.lineNumber;
  };

  this._readNextBundleOfLines = function() {
    var line = "";
    while ((this.read = fs.readSync(this.fd, this.buffer, 0, this.bufferSize, null)) !== 0) { // read next bytes until end of file
      this.leftOver += this.buffer.toString('utf8', 0, this.read); // append to leftOver
      this.idxStart = 0
      while ((this.idx = this.leftOver.indexOf("\n", this.idxStart)) !== -1) { // as long as there is a newline-char in leftOver
        line = this.leftOver.substring(this.idxStart, this.idx);
        this._bundleOfLines.push(line);        
        this.idxStart = this.idx + 1;
      }
      this.leftOver = this.leftOver.substring(this.idxStart);
      if (line !== "") {
        break;
      }
    }
  }; 
}
Benvorth
fonte
0
import * as csv from 'fast-csv';
import * as fs from 'fs';
interface Row {
  [s: string]: string;
}
type RowCallBack = (data: Row, index: number) => object;
export class CSVReader {
  protected file: string;
  protected csvOptions = {
    delimiter: ',',
    headers: true,
    ignoreEmpty: true,
    trim: true
  };
  constructor(file: string, csvOptions = {}) {
    if (!fs.existsSync(file)) {
      throw new Error(`File ${file} not found.`);
    }
    this.file = file;
    this.csvOptions = Object.assign({}, this.csvOptions, csvOptions);
  }
  public read(callback: RowCallBack): Promise < Array < object >> {
    return new Promise < Array < object >> (resolve => {
      const readStream = fs.createReadStream(this.file);
      const results: Array < any > = [];
      let index = 0;
      const csvStream = csv.parse(this.csvOptions).on('data', async (data: Row) => {
        index++;
        results.push(await callback(data, index));
      }).on('error', (err: Error) => {
        console.error(err.message);
        throw err;
      }).on('end', () => {
        resolve(results);
      });
      readStream.pipe(csvStream);
    });
  }
}
import { CSVReader } from '../src/helpers/CSVReader';
(async () => {
  const reader = new CSVReader('./database/migrations/csv/users.csv');
  const users = await reader.read(async data => {
    return {
      username: data.username,
      name: data.name,
      email: data.email,
      cellPhone: data.cell_phone,
      homePhone: data.home_phone,
      roleId: data.role_id,
      description: data.description,
      state: data.state,
    };
  });
  console.log(users);
})();
Raza
fonte
-1

Eu criei um módulo de nó para ler arquivos grandes de forma assíncrona ou JSON. Testado em arquivos grandes.

var fs = require('fs')
, util = require('util')
, stream = require('stream')
, es = require('event-stream');

module.exports = FileReader;

function FileReader(){

}

FileReader.prototype.read = function(pathToFile, callback){
    var returnTxt = '';
    var s = fs.createReadStream(pathToFile)
    .pipe(es.split())
    .pipe(es.mapSync(function(line){

        // pause the readstream
        s.pause();

        //console.log('reading line: '+line);
        returnTxt += line;        

        // resume the readstream, possibly from a callback
        s.resume();
    })
    .on('error', function(){
        console.log('Error while reading file.');
    })
    .on('end', function(){
        console.log('Read entire file.');
        callback(returnTxt);
    })
);
};

FileReader.prototype.readJSON = function(pathToFile, callback){
    try{
        this.read(pathToFile, function(txt){callback(JSON.parse(txt));});
    }
    catch(err){
        throw new Error('json file is not valid! '+err.stack);
    }
};

Apenas salve o arquivo como file-reader.js e use-o assim:

var FileReader = require('./file-reader');
var fileReader = new FileReader();
fileReader.readJSON(__dirname + '/largeFile.json', function(jsonObj){/*callback logic here*/});
Eyal Zoref
fonte
7
Parece que você copiou da resposta de Gerard. Você deve dar crédito a Gerard pela parte que copiou.
Paul Lynch