Eu tenho um servidor Nó que cria um processo filho fork()
usando IPC. Em algum momento, a criança envia os resultados de volta aos pais em cerca de 10Hz, como parte de uma tarefa de longa duração. Quando a carga transmitida process.send()
é pequena, tudo funciona bem: todas as mensagens enviadas são recebidas ~ imediatamente e processadas pelos pais.
No entanto, quando a carga útil é 'grande' - não determinei o limite exato de tamanho -, em vez de ser recebida imediatamente pelo pai, todas as cargas são enviadas primeiro e somente depois que o filho termina sua tarefa de longa execução, o pai recebe e processe as mensagens.
tl; dr visual:
Bom (acontece com pequena carga útil):
child: send()
parent: receive()
child: send()
parent: receive()
child: send()
parent: receive()
...
Ruim (acontece com grande carga):
child: send()
child: send()
child: send()
(repeat many times over many seconds)
...
parent: receive()
parent: receive()
parent: receive()
parent: receive()
...
- Isso é um inseto? (Editar: o comportamento ocorre apenas no OS X, não no Windows ou Linux)
- Existe alguma maneira de evitar isso, além de tentar manter minha carga útil do IPC pequena?
Edit 2 : o código de exemplo abaixo usa o contador de tempo e iteração para selecionar quando enviar uma atualização. (No meu código atual, também é possível enviar uma atualização após n iterações ou após o loop alcançar determinados resultados.) Como uma reescrita do código para usar setInterval
/ em setTimeout
vez de um loop é um último recurso para mim, pois exige que eu para remover recursos.
Edit : Aqui está o código de teste que reproduz o problema. No entanto, ele é reproduzido apenas no OS X, não no Windows ou Linux:
server.js
const opts = {stdio:['inherit', 'inherit', 'inherit', 'ipc']};
const child = require('child_process').fork('worker.js', [], opts);
child.on('message', msg => console.log(`parent: receive() ${msg.data.length} bytes`, Date.now()));
require('http').createServer((req, res) => {
console.log(req.url);
const match = /\d+/.exec(req.url);
if (match) {
child.send(match[0]*1);
res.writeHead(200, {'Content-Type':'text/plain'});
res.end(`Sending packets of size ${match[0]}`);
} else {
res.writeHead(404, {'Content-Type':'text/plain'});
res.end('what?');
}
}).listen(8080);
worker.js
if (process.send) process.on('message', msg => run(msg));
function run(messageSize) {
const msg = new Array(messageSize+1).join('x');
let lastUpdate = Date.now();
for (let i=0; i<1e7; ++i) {
const now = Date.now();
if ((now-lastUpdate)>200 || i%5000==0) {
console.log(`worker: send() > ${messageSize} bytes`, now);
process.send({action:'update', data:msg});
lastUpdate = Date.now();
}
Math.sqrt(Math.random());
}
console.log('worker done');
}
Cerca de 8k o problema acontece. Por exemplo, ao consultar http://localhost:8080/15
vshttp://localhost:8080/123456
/15
worker: send() > 15 bytes 1571324249029
parent: receive() 15 bytes 1571324249034
worker: send() > 15 bytes 1571324249235
parent: receive() 15 bytes 1571324249235
worker: send() > 15 bytes 1571324249436
parent: receive() 15 bytes 1571324249436
worker done
/123456
worker: send() > 123456 bytes 1571324276973
worker: send() > 123456 bytes 1571324277174
worker: send() > 123456 bytes 1571324277375
child done
parent: receive() 123456 bytes 1571324277391
parent: receive() 123456 bytes 1571324277391
parent: receive() 123456 bytes 1571324277393
Experiente nos nós v12.7 e v12.12.
fonte
setInterval()
?run()
tem umwhile
loop nele? Você está sugerindo que mudar isso parasetInterval()
resolverá meu problema? Para responder à pergunta que acho que você está perguntando: uso umwhile
loop porque essa função é o único objetivo desse processo de trabalho e (com pequenas cargas de IPC) não causou nenhum problema que eu pudesse ver.setInterval()
libera o loop de eventos para executar E / S em segundo plano. Não estou dizendo que definitivamente resolverá esse problema, mas parece uma escolha estranha escrevê-lo da maneira que você criou, só porque você pode.setTimeout()
ousetInterval()
. A mudança aqui é trivial.Respostas:
lTer um loop while de longa duração e bloqueio em combinação com soquetes ou descritores de arquivo no nó é sempre uma indicação de que algo é feito de errado.
Sem poder testar toda a configuração, é difícil saber se minha reivindicação está realmente correta, mas é provável que mensagens curtas possam ser passadas diretamente em um pedaço para o sistema operacional, que depois o passa para o outro processo. Com mensagens maiores, o nó precisaria aguardar até que o sistema operacional receba mais dados, para que o envio seja enfileirado e, como você está bloqueando,
while
o envio é enfileirado até oloop
final do tempo .Portanto, para sua pergunta, não é que isso não seja um bug.
Como você usa uma versão nodejs recente que eu usaria um
await
easync
, em vez de e criar um sem bloqueiowhile
semelhante aosleep
de esta resposta . Oawait
permitirá que o ciclo de eventos nó para interceptar seprocessSome
retornos pendentes Promise.Para o seu código que não reflete realmente um caso de uso real, é difícil saber como resolvê-lo corretamente. Se você não fizer nada assíncrono
processSome
que permita a interceptação de E / S, precisará fazer isso manualmente regularmente, como por exemplo, aawait new Promise(setImmediate);
.fonte
processSome()
código para fora dowhile
loop. (Ou talvez eu estou faltando alguma coisa cruciais relacionadas com promessas.)process.send({action:'update', data:status()});
seja extraída quandoevery10Hz
for verdadeira eprocessSome
para cada iteração dowhile
. Oawait
deve permitir que o EvenLoop do nó intercepte, mesmoprocessSome
que não retorne uma promessa. Mas a razão do seu problema ainda é que o loop está bloqueando.processSome()
não retornar uma promessa, essa abordagem ainda bloqueia a E / S (microtasks como a continuação produzida por essaawait
instrução são processadas antes da E / S). Além disso, isso fará com que a iteração seja executada muito mais lentamente, devido à microtask na fila a cada iteração.Em relação à sua primeira pergunta
Definitivamente, isso não é um bug e eu poderia reproduzi-lo no meu Windows 10 (para o tamanho 123456). É principalmente por causa do buffer do kernel subjacente e da alternância de contexto pelo SO, pois dois processos separados (não desanexados) estão se comunicando através de um descritor de ipc.
Em relação à sua segunda pergunta
Se eu entendi o problema corretamente, você está tentando resolver, para cada solicitação http, toda vez que o trabalhador envia um pedaço de volta ao servidor, você deseja que o servidor o processe antes de obter o próximo pedaço. É assim que eu entendo quando você disse que o processamento de sincronização
Existe uma maneira de usar promessas, mas eu gostaria de usar geradores nos trabalhadores. É melhor orquestrar o fluxo entre servidor e trabalhador
Fluxo:
server.js
worker.js
Se conhecermos o caso de uso exato, poderá haver maneiras melhores de programá-lo. Essa é apenas uma maneira de sincronizar o processo filho, mantendo grande parte do seu código-fonte original.
fonte
Se você precisar garantir que uma mensagem seja recebida antes de enviar a próxima, aguarde o recebimento do recebimento do mestre. É claro que isso atrasará o envio da próxima mensagem, mas como sua lógica depende do tempo e do número da iteração para determinar se deve enviar uma mensagem, pode ser que seja bom para o seu caso.
A implementação precisará que cada trabalhador crie uma promessa para cada mensagem enviada e aguarde uma resposta do mestre antes de resolvê-la. Isso também significa que você precisa identificar qual mensagem é reconhecida com base em um ID de mensagem ou em algo único, se você tiver mais de uma mensagem ou trabalhador simultaneamente.
aqui está o código modificado
server.js
worker.js
ps Eu não testei o código, por isso pode precisar de alguns ajustes, mas a idéia deve valer.
fonte
Embora eu concorde com os outros que a solução ideal seria aquela em que o processo filho voluntariamente renuncia ao controle no final de cada loop, permitindo que os processos de liberação do buffer sejam executados, há uma correção fácil / rápida / suja que deixa você quase síncrono comportamento, e isso é fazer a criança
send
bloquear chamadas.Use o mesmo de
server.js
antes e quase o mesmoworker.js
, com apenas uma linha adicionada:worker.js
Resultado:
fonte