Analisar arquivo JSON grande em Nodejs

98

Eu tenho um arquivo que armazena muitos objetos JavaScript no formato JSON e preciso ler o arquivo, criar cada um dos objetos e fazer algo com eles (inseri-los em um banco de dados no meu caso). Os objetos JavaScript podem ser representados em um formato:

Formato A:

[{name: 'thing1'},
....
{name: 'thing999999999'}]

ou Formato B:

{name: 'thing1'}         // <== My choice.
...
{name: 'thing999999999'}

Observe que ...indica muitos objetos JSON. Estou ciente de que posso ler todo o arquivo na memória e usar JSON.parse()desta forma:

fs.readFile(filePath, 'utf-8', function (err, fileContents) {
  if (err) throw err;
  console.log(JSON.parse(fileContents));
});

No entanto, o arquivo pode ser muito grande, prefiro usar um fluxo para fazer isso. O problema que vejo com um fluxo é que o conteúdo do arquivo pode ser dividido em blocos de dados a qualquer momento, então, como posso usar JSON.parse()nesses objetos?

Idealmente, cada objeto seria lido como um bloco de dados separado, mas não tenho certeza de como fazer isso .

var importStream = fs.createReadStream(filePath, {flags: 'r', encoding: 'utf-8'});
importStream.on('data', function(chunk) {

    var pleaseBeAJSObject = JSON.parse(chunk);           
    // insert pleaseBeAJSObject in a database
});
importStream.on('end', function(item) {
   console.log("Woot, imported objects into the database!");
});*/

Observe, desejo evitar a leitura de todo o arquivo na memória. A eficiência do tempo não importa para mim. Sim, eu poderia tentar ler vários objetos de uma vez e inseri-los todos de uma vez, mas isso é um ajuste de desempenho - preciso de uma maneira que garantidamente não cause uma sobrecarga de memória, não importa quantos objetos estejam contidos no arquivo .

Posso escolher usar FormatAou FormatBou talvez outra coisa, apenas especifique na sua resposta. Obrigado!

dgh
fonte
Para o formato B, você poderia analisar o pedaço para novas linhas e extrair cada linha inteira, concatenando o resto se ela cortar no meio. Pode haver uma maneira mais elegante, no entanto. Não trabalhei muito com streams.
travis

Respostas:

82

Para processar um arquivo linha por linha, você simplesmente precisa separar a leitura do arquivo e o código que atua sobre essa entrada. Você pode fazer isso armazenando em buffer sua entrada até atingir uma nova linha. Supondo que tenhamos um objeto JSON por linha (basicamente, formato B):

var stream = fs.createReadStream(filePath, {flags: 'r', encoding: 'utf-8'});
var buf = '';

stream.on('data', function(d) {
    buf += d.toString(); // when data is read, stash it in a string buffer
    pump(); // then process the buffer
});

function pump() {
    var pos;

    while ((pos = buf.indexOf('\n')) >= 0) { // keep going while there's a newline somewhere in the buffer
        if (pos == 0) { // if there's more than one newline in a row, the buffer will now start with a newline
            buf = buf.slice(1); // discard it
            continue; // so that the next iteration will start with data
        }
        processLine(buf.slice(0,pos)); // hand off the line
        buf = buf.slice(pos+1); // and slice the processed data off the buffer
    }
}

function processLine(line) { // here's where we do something with a line

    if (line[line.length-1] == '\r') line=line.substr(0,line.length-1); // discard CR (0x0D)

    if (line.length > 0) { // ignore empty lines
        var obj = JSON.parse(line); // parse the JSON
        console.log(obj); // do something with the data here!
    }
}

Cada vez que o fluxo de arquivos recebe dados do sistema de arquivos, eles são armazenados em um buffer e pumpsão chamados.

Se não houver nova linha no buffer, pumpsimplesmente retorna sem fazer nada. Mais dados (e potencialmente uma nova linha) serão adicionados ao buffer na próxima vez que o fluxo obtiver dados, e então teremos um objeto completo.

Se houver uma nova linha, pumpcorta o buffer do início até a nova linha e o passa para process. Em seguida, verifica novamente se há outra nova linha no buffer (o whileloop). Dessa forma, podemos processar todas as linhas que foram lidas no trecho atual.

Finalmente, processé chamado uma vez por linha de entrada. Se estiver presente, ele remove o caractere de retorno de carro (para evitar problemas com terminações de linha - LF vs CRLF) e, em seguida, liga para JSON.parsea linha. Neste ponto, você pode fazer o que for necessário com seu objeto.

Observe que JSON.parseé rígido quanto ao que aceita como entrada; você deve citar seus identificadores e valores de string com aspas duplas . Em outras palavras, {name:'thing1'}gerará um erro; você deve usar {"name":"thing1"}.

Como não haverá mais do que um pedaço de dados na memória por vez, isso será extremamente eficiente em termos de memória. Também será extremamente rápido. Um teste rápido mostrou que processei 10.000 linhas em menos de 15 ms.

Josh3736
fonte
12
Esta resposta agora é redundante. Use JSONStream e você terá suporte imediato.
arcseldon
2
O nome da função 'processo' é ruim. 'processo' deve ser uma variável do sistema. Este bug me confundiu por horas.
Zhigong Li
17
@arcseldon Não acho que o fato de haver uma biblioteca que faz isso torne esta resposta redundante. Certamente ainda é útil saber como isso pode ser feito sem o módulo.
Kevin B de
3
Não tenho certeza se isso funcionaria para um arquivo json minificado. E se todo o arquivo estivesse empacotado em uma única linha e o uso de tais delimitadores não fosse possível? Como resolvemos esse problema então?
SLearner
7
Bibliotecas de terceiros não são feitas de mágica, você sabe. Eles são exatamente como esta resposta, versões elaboradas de soluções enroladas à mão, mas apenas embaladas e rotuladas como um programa. Entender como as coisas funcionam é muito mais importante e relevante do que jogar cegamente dados em uma biblioteca esperando resultados. Apenas dizendo :)
zanona
34

Assim como eu estava pensando que seria divertido escrever um analisador JSON de streaming, também pensei que talvez devesse fazer uma pesquisa rápida para ver se já existe um disponível.

Acontece que existe.

Como acabei de encontrá-lo, obviamente não o usei, então não posso comentar sobre sua qualidade, mas ficarei interessado em saber se funciona.

Ele funciona considerando o seguinte Javascript e _.isString:

stream.pipe(JSONStream.parse('*'))
  .on('data', (d) => {
    console.log(typeof d);
    console.log("isString: " + _.isString(d))
  });

Isso registrará os objetos à medida que eles entrarem, se o fluxo for uma matriz de objetos. Portanto, a única coisa sendo armazenada em buffer é um objeto por vez.

user1106925
fonte
29

A partir de outubro de 2014 , você pode fazer algo como o seguinte (usando JSONStream) - https://www.npmjs.org/package/JSONStream

var fs = require('fs'),
    JSONStream = require('JSONStream'),

var getStream() = function () {
    var jsonData = 'myData.json',
        stream = fs.createReadStream(jsonData, { encoding: 'utf8' }),
        parser = JSONStream.parse('*');
    return stream.pipe(parser);
}

getStream().pipe(MyTransformToDoWhateverProcessingAsNeeded).on('error', function (err) {
    // handle any errors
});

Para demonstrar com um exemplo de trabalho:

npm install JSONStream event-stream

data.json:

{
  "greeting": "hello world"
}

hello.js:

var fs = require('fs'),
    JSONStream = require('JSONStream'),
    es = require('event-stream');

var getStream = function () {
    var jsonData = 'data.json',
        stream = fs.createReadStream(jsonData, { encoding: 'utf8' }),
        parser = JSONStream.parse('*');
    return stream.pipe(parser);
};

getStream()
    .pipe(es.mapSync(function (data) {
        console.log(data);
    }));
$ node hello.js
// hello world
Arceldon
fonte
2
Isso é principalmente verdadeiro e útil, mas acho que você precisa fazer parse('*')ou não receberá nenhum dado.
John Zwinck,
@JohnZwinck Obrigado, atualizei a resposta e adicionei um exemplo funcional para demonstrá-lo totalmente.
arcseldon
no primeiro bloco de código, o primeiro conjunto de parênteses var getStream() = function () {deve ser removido.
givemesnacks
1
Isso falhou com um erro de falta de memória com um arquivo json de 500 MB.
Keith John Hutchison de
18

Sei que você deseja evitar a leitura de todo o arquivo JSON na memória, se possível, no entanto, se você tiver memória disponível, pode não ser uma má ideia em termos de desempenho. Usar o require () do node.js em um arquivo json carrega os dados na memória muito rápido.

Executei dois testes para ver como fica o desempenho ao imprimir um atributo de cada recurso de um arquivo geojson de 81 MB.

No primeiro teste, li todo o arquivo geojson na memória usando var data = require('./geo.json'). Isso levou 3.330 milissegundos e, em seguida, imprimir um atributo de cada recurso levou 804 milissegundos para um total geral de 4.134 milissegundos. No entanto, parecia que node.js estava usando 411 MB de memória.

No segundo teste, usei a resposta de @arcseldon com JSONStream + event-stream. Modifiquei a consulta JSONPath para selecionar apenas o que precisava. Desta vez, a memória nunca ultrapassou 82 MB, no entanto, a coisa toda demorou 70 segundos para ser concluída!

Evan Siroky
fonte
18

Eu tinha um requisito semelhante, preciso ler um grande arquivo json no nó js e processar dados em blocos e chamar uma api e salvar no mongodb. inputFile.json é como:

{
 "customers":[
       { /*customer data*/},
       { /*customer data*/},
       { /*customer data*/}....
      ]
}

Agora eu usei JsonStream e EventStream para fazer isso de forma síncrona.

var JSONStream = require("JSONStream");
var es = require("event-stream");

fileStream = fs.createReadStream(filePath, { encoding: "utf8" });
fileStream.pipe(JSONStream.parse("customers.*")).pipe(
  es.through(function(data) {
    console.log("printing one customer object read from file ::");
    console.log(data);
    this.pause();
    processOneCustomer(data, this);
    return data;
  }),
  function end() {
    console.log("stream reading ended");
    this.emit("end");
  }
);

function processOneCustomer(data, es) {
  DataModel.save(function(err, dataModel) {
    es.resume();
  });
}
Karthick N
fonte
Muito obrigado por adicionar sua resposta, meu caso também precisava de algum tratamento síncrono. No entanto, após o teste, não foi possível chamar "end ()" como um retorno de chamada após a conclusão do pipe. Acredito que a única coisa que poderia ser feita é adicionar um evento, o que deve acontecer depois que o fluxo for 'terminado' / 'fechado' com ´fileStream.on ('fechar', ...) ´.
nonNumericalFloat
6

Eu escrevi um módulo que pode fazer isso, chamado BFJ . Especificamente, o método bfj.matchpode ser usado para quebrar um grande fluxo em blocos discretos de JSON:

const bfj = require('bfj');
const fs = require('fs');

const stream = fs.createReadStream(filePath);

bfj.match(stream, (key, value, depth) => depth === 0, { ndjson: true })
  .on('data', object => {
    // do whatever you need to do with object
  })
  .on('dataError', error => {
    // a syntax error was found in the JSON
  })
  .on('error', error => {
    // some kind of operational error occurred
  })
  .on('end', error => {
    // finished processing the stream
  });

Aqui, bfj.matchretorna um fluxo legível no modo de objeto que receberá os itens de dados analisados ​​e recebe 3 argumentos:

  1. Um fluxo legível contendo o JSON de entrada.

  2. Um predicado que indica quais itens do JSON analisado serão enviados para o fluxo de resultados.

  3. Um objeto de opções que indica que a entrada é JSON delimitado por nova linha (isso é para processar o formato B da pergunta, não é necessário para o formato A).

Ao ser chamado, bfj.matchanalisará o JSON do fluxo de entrada em profundidade primeiro, chamando o predicado com cada valor para determinar se deve ou não enviar esse item para o fluxo de resultado. O predicado recebe três argumentos:

  1. A chave de propriedade ou índice de matriz (será undefinedpara itens de nível superior).

  2. O próprio valor.

  3. A profundidade do item na estrutura JSON (zero para itens de nível superior).

É claro que um predicado mais complexo também pode ser usado conforme necessário, de acordo com os requisitos. Você também pode passar uma string ou uma expressão regular em vez de uma função de predicado, se quiser realizar correspondências simples com as chaves de propriedade.

Phil Booth
fonte
4

Resolvi esse problema usando o módulo npm dividido . Divida seu fluxo em uma divisão e " Quebrar um fluxo e remontá-lo de modo que cada linha seja um pedaço ".

Código de amostra:

var fs = require('fs')
  , split = require('split')
  ;

var stream = fs.createReadStream(filePath, {flags: 'r', encoding: 'utf-8'});
var lineStream = stream.pipe(split());
linestream.on('data', function(chunk) {
    var json = JSON.parse(chunk);           
    // ...
});
Brian Leathem
fonte
4

Se você tiver controle sobre o arquivo de entrada e for uma matriz de objetos, poderá resolver isso mais facilmente. Organize a saída do arquivo com cada registro em uma linha, assim:

[
   {"key": value},
   {"key": value},
   ...

Este ainda é um JSON válido.

Em seguida, use o módulo readline node.js para processá-los uma linha por vez.

var fs = require("fs");

var lineReader = require('readline').createInterface({
    input: fs.createReadStream("input.txt")
});

lineReader.on('line', function (line) {
    line = line.trim();

    if (line.charAt(line.length-1) === ',') {
        line = line.substr(0, line.length-1);
    }

    if (line.charAt(0) === '{') {
        processRecord(JSON.parse(line));
    }
});

function processRecord(record) {
    // Process the records one at a time here! 
}
Steve Hanov
fonte
-1

Acho que você precisa usar um banco de dados. MongoDB é uma boa escolha neste caso porque é compatível com JSON.

ATUALIZAÇÃO : você pode usar a ferramenta mongoimport para importar dados JSON para o MongoDB.

mongoimport --collection collection --file collection.json
Vadim Baryshev
fonte
1
Isso não responde à pergunta. Observe que a segunda linha da pergunta diz que ele deseja fazer isso para colocar os dados em um banco de dados .
josh3736
mongoimport importa apenas arquivos de até 16 MB.
Haziq Ahmed