Forçando ~ IPC Node.js síncrono

8

Eu tenho um servidor Nó que cria um processo filho fork()usando IPC. Em algum momento, a criança envia os resultados de volta aos pais em cerca de 10Hz, como parte de uma tarefa de longa duração. Quando a carga transmitida process.send()é pequena, tudo funciona bem: todas as mensagens enviadas são recebidas ~ imediatamente e processadas pelos pais.

No entanto, quando a carga útil é 'grande' - não determinei o limite exato de tamanho -, em vez de ser recebida imediatamente pelo pai, todas as cargas são enviadas primeiro e somente depois que o filho termina sua tarefa de longa execução, o pai recebe e processe as mensagens.

tl; dr visual:

Bom (acontece com pequena carga útil):

child:  send()
parent: receive()
child:  send()
parent: receive()
child:  send()
parent: receive()
...

Ruim (acontece com grande carga):

child:  send()
child:  send()
child:  send()
(repeat many times over many seconds)
...
parent: receive()
parent: receive()
parent: receive()
parent: receive()
...
  1. Isso é um inseto? (Editar: o comportamento ocorre apenas no OS X, não no Windows ou Linux)
  2. Existe alguma maneira de evitar isso, além de tentar manter minha carga útil do IPC pequena?

Edit 2 : o código de exemplo abaixo usa o contador de tempo e iteração para selecionar quando enviar uma atualização. (No meu código atual, também é possível enviar uma atualização após n iterações ou após o loop alcançar determinados resultados.) Como uma reescrita do código para usar setInterval/ em setTimeoutvez de um loop é um último recurso para mim, pois exige que eu para remover recursos.

Edit : Aqui está o código de teste que reproduz o problema. No entanto, ele é reproduzido apenas no OS X, não no Windows ou Linux:

server.js

const opts = {stdio:['inherit', 'inherit', 'inherit', 'ipc']};
const child = require('child_process').fork('worker.js', [], opts);

child.on('message', msg => console.log(`parent: receive() ${msg.data.length} bytes`, Date.now()));

require('http').createServer((req, res) => {
   console.log(req.url);
   const match = /\d+/.exec(req.url);
   if (match) {
      child.send(match[0]*1);
      res.writeHead(200, {'Content-Type':'text/plain'});
      res.end(`Sending packets of size ${match[0]}`);
   } else {
      res.writeHead(404, {'Content-Type':'text/plain'});
      res.end('what?');
   }
}).listen(8080);

worker.js

if (process.send) process.on('message', msg => run(msg));

function run(messageSize) {
   const msg = new Array(messageSize+1).join('x');
   let lastUpdate = Date.now();
   for (let i=0; i<1e7; ++i) {
      const now = Date.now();
      if ((now-lastUpdate)>200 || i%5000==0) {
         console.log(`worker: send()  > ${messageSize} bytes`, now);
         process.send({action:'update', data:msg});
         lastUpdate = Date.now();
      }
      Math.sqrt(Math.random());
   }
   console.log('worker done');
}

Cerca de 8k o problema acontece. Por exemplo, ao consultar http://localhost:8080/15vshttp://localhost:8080/123456

/15
worker: send()  > 15 bytes 1571324249029
parent: receive() 15 bytes 1571324249034
worker: send()  > 15 bytes 1571324249235
parent: receive() 15 bytes 1571324249235
worker: send()  > 15 bytes 1571324249436
parent: receive() 15 bytes 1571324249436
worker done
/123456
worker: send()  > 123456 bytes 1571324276973
worker: send()  > 123456 bytes 1571324277174
worker: send()  > 123456 bytes 1571324277375
child done
parent: receive() 123456 bytes 1571324277391
parent: receive() 123456 bytes 1571324277391
parent: receive() 123456 bytes 1571324277393

Experiente nos nós v12.7 e v12.12.

Phrogz
fonte
1
Em vez de enfileirar as mensagens em um loop de bloqueio, por que não usar um setInterval()?
Patrick Roberts
@PatrickRoberts Você está questionando por que run()tem um whileloop nele? Você está sugerindo que mudar isso para setInterval()resolverá meu problema? Para responder à pergunta que acho que você está perguntando: uso um whileloop porque essa função é o único objetivo desse processo de trabalho e (com pequenas cargas de IPC) não causou nenhum problema que eu pudesse ver.
Phrtz 17/10/19
1
Bloquear assim não serve para nenhum propósito benéfico. O uso de um mecanismo de tempo sem bloqueio, como setInterval()libera o loop de eventos para executar E / S em segundo plano. Não estou dizendo que definitivamente resolverá esse problema, mas parece uma escolha estranha escrevê-lo da maneira que você criou, só porque você pode.
Patrick Roberts
@PatrickRoberts Obrigado pela contribuição. Não escrevi dessa maneira "apenas porque posso", mas porque, originalmente, o código era baseado em console sem IPC. Um loop while que imprime resultados periodicamente parecia razoável no momento, mas está enfrentando esse problema (somente no macOS).
`` Phrogz
Escrever um loop de bloqueio que controla o horário atual até que uma condição baseada no tempo seja atendida é um antipadrão no JavaScript, ponto final. Não importa se ele tinha IPC antes ou não. Sempre prefira uma abordagem sem bloqueio usando setTimeout()ou setInterval(). A mudança aqui é trivial.
Patrick Roberts

Respostas:

3

lTer um loop while de longa duração e bloqueio em combinação com soquetes ou descritores de arquivo no nó é sempre uma indicação de que algo é feito de errado.

Sem poder testar toda a configuração, é difícil saber se minha reivindicação está realmente correta, mas é provável que mensagens curtas possam ser passadas diretamente em um pedaço para o sistema operacional, que depois o passa para o outro processo. Com mensagens maiores, o nó precisaria aguardar até que o sistema operacional receba mais dados, para que o envio seja enfileirado e, como você está bloqueando, whileo envio é enfileirado até o loopfinal do tempo .

Portanto, para sua pergunta, não é que isso não seja um bug.

Como você usa uma versão nodejs recente que eu usaria um awaite async, em vez de e criar um sem bloqueio while semelhante ao sleepde esta resposta . O awaitpermitirá que o ciclo de eventos nó para interceptar se processSomeretornos pendentes Promise.

Para o seu código que não reflete realmente um caso de uso real, é difícil saber como resolvê-lo corretamente. Se você não fizer nada assíncrono processSomeque permita a interceptação de E / S, precisará fazer isso manualmente regularmente, como por exemplo, a await new Promise(setImmediate);.

async function run() {
  let interval = setInterval(() => {
    process.send({action:'update', data:status()});
    console.log('child:  send()');
  }, 1/10)

  while(keepGoing()) {
    await processSome();
  }

  clearInterval(interval)
}
t.niese
fonte
Obrigado por esta resposta. De acordo com minha edição da pergunta, meu código real tem várias condições para enviar uma atualização, apenas uma baseada no tempo. Parece que você moveu o processSome()código para fora do whileloop. (Ou talvez eu estou faltando alguma coisa cruciais relacionadas com promessas.)
Phrogz
1
@Phrogz ah ok não, acidentalmente li o aparelho da maneira errada. Atualizei a resposta para que process.send({action:'update', data:status()});seja extraída quando every10Hzfor verdadeira e processSomepara cada iteração do while. O awaitdeve permitir que o EvenLoop do nó intercepte, mesmo processSomeque não retorne uma promessa. Mas a razão do seu problema ainda é que o loop está bloqueando.
t.niese
Dois comentários sobre esta resposta como estão. Se processSome()não retornar uma promessa, essa abordagem ainda bloqueia a E / S (microtasks como a continuação produzida por essa awaitinstrução são processadas antes da E / S). Além disso, isso fará com que a iteração seja executada muito mais lentamente, devido à microtask na fila a cada iteração.
Patrick Roberts
@PatrickRoberts sim, você está certo, ele deve retornar uma promessa não resolvida.
t.niese
2

Em relação à sua primeira pergunta

Isso é um inseto? (Editar: o comportamento ocorre apenas no OS X, não no Windows ou Linux)

Definitivamente, isso não é um bug e eu poderia reproduzi-lo no meu Windows 10 (para o tamanho 123456). É principalmente por causa do buffer do kernel subjacente e da alternância de contexto pelo SO, pois dois processos separados (não desanexados) estão se comunicando através de um descritor de ipc.

Em relação à sua segunda pergunta

Existe alguma maneira de evitar isso, além de tentar manter minha carga útil do IPC pequena?

Se eu entendi o problema corretamente, você está tentando resolver, para cada solicitação http, toda vez que o trabalhador envia um pedaço de volta ao servidor, você deseja que o servidor o processe antes de obter o próximo pedaço. É assim que eu entendo quando você disse que o processamento de sincronização

Existe uma maneira de usar promessas, mas eu gostaria de usar geradores nos trabalhadores. É melhor orquestrar o fluxo entre servidor e trabalhador

Fluxo:

  1. O servidor envia um número inteiro ao trabalhador, o que quer que seja obtido da solicitação http
  2. O Worker cria e executa o gerador para enviar o primeiro pedaço
  3. O trabalhador cede após enviar o pedaço
  4. Solicitações do servidor para mais
  5. Worker gera mais, já que o servidor solicitou mais (somente se disponível)
  6. Se não houver mais, o trabalhador envia o final dos pedaços
  7. O servidor apenas registra que o trabalhador está pronto e não solicita mais

server.js

const opts = {stdio:['inherit', 'inherit', 'inherit', 'ipc'], detached:false};
const child = require('child_process').fork('worker.js', [], opts);

child.on('message', (msg) => {
   //FLOW 7: Worker is done, just log
   if (msg.action == 'end'){
      console.log(`child ended for a particular request`)
   } else {
      console.log(`parent: receive(${msg.data.iter}) ${msg.data.msg.length} bytes`, Date.now())
      //FLOW 4: Server requests for more
      child.send('more')
   }   

});

require('http').createServer((req, res) => {
   console.log(req.url);
   const match = /\d+/.exec(req.url);   
   if (match) {
      //FLOW 1: Server sends integer to worker
      child.send(match[0]*1);
      res.writeHead(200, {'Content-Type':'text/plain'});
      res.end(`Sending packets of size ${match[0]}`);
   } else {
      res.writeHead(404, {'Content-Type':'text/plain'});
      res.end('what?');
   }
}).listen(8080);

worker.js

let runner
if (process.send) process.on('message', msg => {   
   //FLOW 2: Worker creates and runs a generator to send the first chunk
   if (parseInt(msg)) {
      runner = run(msg)
      runner.next()
   }
   //FLOW 5: Server asked more, so generate more chunks if available
   if (msg == "more") runner.next()

});

//generator function *
function* run(messageSize) {
   const msg = new Array(messageSize+1).join('x');
   let lastUpdate = Date.now();
   for (let i=0; i<1e7; ++i) {
      const now = Date.now();
      if ((now-lastUpdate)>200 || i%5000==0) {
         console.log(`worker: send(${i})  > ${messageSize} bytes`, now);
         let j = i         
         process.send({action:'update', data:{msg, iter:j}});
         //FLOW 3: Worker yields after sending the chunk
         yield
         lastUpdate = Date.now();
      }
      Math.sqrt(Math.random());
   }
   //FLOW 6: If no more, worker sends end signal
   process.send({action:'end'});
   console.log('worker done');
}

Se conhecermos o caso de uso exato, poderá haver maneiras melhores de programá-lo. Essa é apenas uma maneira de sincronizar o processo filho, mantendo grande parte do seu código-fonte original.

manikawnth
fonte
1

Se você precisar garantir que uma mensagem seja recebida antes de enviar a próxima, aguarde o recebimento do recebimento do mestre. É claro que isso atrasará o envio da próxima mensagem, mas como sua lógica depende do tempo e do número da iteração para determinar se deve enviar uma mensagem, pode ser que seja bom para o seu caso.

A implementação precisará que cada trabalhador crie uma promessa para cada mensagem enviada e aguarde uma resposta do mestre antes de resolvê-la. Isso também significa que você precisa identificar qual mensagem é reconhecida com base em um ID de mensagem ou em algo único, se você tiver mais de uma mensagem ou trabalhador simultaneamente.

aqui está o código modificado

server.js

const opts = {stdio:['inherit', 'inherit', 'inherit', 'ipc']};
const child = require('child_process').fork('worker.js', [], opts);

child.on('message', msg =>  {
    console.log(`parent: receive() ${msg.data.length} bytes`, Date.now())
    // reply to the child with the id
    child.send({ type: 'acknowledge', id: msg.id });
});

...

worker.js

const pendingMessageResolves = {};

if (process.send) process.on('message', msg => { 
    if (msg.type === 'acknowledge') {
        // call the stored resolve function
        pendingMessageResolves[msg.id]();
        // remove the function to allow the memory to be freed
        delete pendingMessageResolves[msg.id]
    } else {
        run(msg) 
    }
});

const sendMessageAndWaitForAcknowledge = (msg) => new Promise(resolve => {
    const id = new uuid(); // or any unique field
    process.send({ action:'update', data: msg, id });
    // store a reference to the resolve function
    pendingMessageResolves[id] = resolve;
})

async function run(messageSize) {
    const msg = new Array(messageSize+1).join('x');
    let lastUpdate = Date.now();
    for (let i=0; i<1e7; ++i) {
        const now = Date.now();
        if ((now-lastUpdate)>200 || i%5000==0) {
            console.log(`worker: send()  > ${messageSize} bytes`, now);
            await sendMessageAndWaitForAcknowledge(msg); // wait until master replies
            lastUpdate = Date.now();
        }
        Math.sqrt(Math.random());
    }
    console.log('worker done');
}

ps Eu não testei o código, por isso pode precisar de alguns ajustes, mas a idéia deve valer.

gafi
fonte
1

Embora eu concorde com os outros que a solução ideal seria aquela em que o processo filho voluntariamente renuncia ao controle no final de cada loop, permitindo que os processos de liberação do buffer sejam executados, há uma correção fácil / rápida / suja que deixa você quase síncrono comportamento, e isso é fazer a criança send bloquear chamadas.

Use o mesmo de server.jsantes e quase o mesmoworker.js , com apenas uma linha adicionada:

worker.js

if (process.send) process.on('message', msg => run(msg));

// cause process.send to block until the message is actually sent                                                                                
process.channel.setBlocking(true);

function run(messageSize) {
   const msg = new Array(messageSize+1).join('x');
   let lastUpdate = Date.now();
   for (let i=0; i<1e6; ++i) {
      const now = Date.now();
      if ((now-lastUpdate)>200 || i%5000==0) {
         console.error(`worker: send()  > ${messageSize} bytes`, now);
         process.send({action:'update', data:msg});
         lastUpdate = Date.now();
      }
      Math.sqrt(Math.random());
   }
   console.log('worker done');
}

Resultado:

/123456
worker: send()  > 123456 bytes 1572113820591
worker: send()  > 123456 bytes 1572113820630
parent: receive() 123456 bytes 1572113820629
parent: receive() 123456 bytes 1572113820647
worker: send()  > 123456 bytes 1572113820659
parent: receive() 123456 bytes 1572113820665
worker: send()  > 123456 bytes 1572113820668
parent: receive() 123456 bytes 1572113820678
worker: send()  > 123456 bytes 1572113820678
parent: receive() 123456 bytes 1572113820683
worker: send()  > 123456 bytes 1572113820683
parent: receive() 123456 bytes 1572113820687
worker: send()  > 123456 bytes 1572113820687
worker: send()  > 123456 bytes 1572113820692
parent: receive() 123456 bytes 1572113820692
parent: receive() 123456 bytes 1572113820696
worker: send()  > 123456 bytes 1572113820696
parent: receive() 123456 bytes 1572113820700
worker: send()  > 123456 bytes 1572113820700
parent: receive() 123456 bytes 1572113820703
worker: send()  > 123456 bytes 1572113820703
parent: receive() 123456 bytes 1572113820706
worker: send()  > 123456 bytes 1572113820706
parent: receive() 123456 bytes 1572113820709
worker: send()  > 123456 bytes 1572113820709
parent: receive() 123456 bytes 1572113820713
worker: send()  > 123456 bytes 1572113820714
worker: send()  > 123456 bytes 1572113820721
parent: receive() 123456 bytes 1572113820722
parent: receive() 123456 bytes 1572113820725
worker: send()  > 123456 bytes 1572113820725
parent: receive() 123456 bytes 1572113820727
Old Pro
fonte
Definir a declaração de bloqueio diretamente no código-fonte é uma idéia inteligente. Isso produzirá um gargalo que não pode ser corrigido. O motivo é que o código fonte é armazenado no disco rígido, o que torna problemático o uso de um mecanismo de regras para alterar o comportamento em tempo real.
Manuel Rodriguez