Eu tenho um código que está iterando em uma lista que foi consultada em um banco de dados e fazendo uma solicitação HTTP para cada elemento dessa lista. Essa lista às vezes pode ser um número razoavelmente grande (na casa dos milhares), e eu gostaria de ter certeza de que não estou acessando um servidor da web com milhares de solicitações HTTP simultâneas.
Uma versão abreviada deste código atualmente se parece com isto ...
function getCounts() {
return users.map(user => {
return new Promise(resolve => {
remoteServer.getCount(user) // makes an HTTP request
.then(() => {
/* snip */
resolve();
});
});
});
}
Promise.all(getCounts()).then(() => { /* snip */});
Este código está sendo executado no Nó 4.3.2. Para reiterar, pode Promise.all
ser administrado de forma que apenas um certo número de Promessas estejam em andamento em um determinado momento?
javascript
node.js
es6-promise
Chris
fonte
fonte
Promise.all
gerencia a progressão das promessas - as promessas fazem isso sozinhas,Promise.all
apenas esperam por elas.Promise
antipadrão do construtor !Respostas:
Observe que
Promise.all()
não aciona as promessas para iniciar seu trabalho, mas criar a promessa em si sim.Com isso em mente, uma solução seria verificar sempre que uma promessa foi resolvida se uma nova promessa deve ser iniciada ou se você já está no limite.
No entanto, realmente não há necessidade de reinventar a roda aqui. Uma biblioteca que você pode usar para esse propósito é
es6-promise-pool
. Dos exemplos deles:// On the Web, leave out this line and use the script tag above instead. var PromisePool = require('es6-promise-pool') var promiseProducer = function () { // Your code goes here. // If there is work left to be done, return the next work item as a promise. // Otherwise, return null to indicate that all promises have been created. // Scroll down for an example. } // The number of promises to process simultaneously. var concurrency = 3 // Create a pool. var pool = new PromisePool(promiseProducer, concurrency) // Start the pool. var poolPromise = pool.start() // Wait for the pool to settle. poolPromise.then(function () { console.log('All promises fulfilled') }, function (error) { console.log('Some promise rejected: ' + error.message) })
fonte
P-Limit
Eu comparei a limitação de concorrência de promessa com um script personalizado, bluebird, es6-promessa-pool e p-limit. Eu acredito que p-limit tem a implementação mais simples e enxuta para essa necessidade. Veja sua documentação .
Requisitos
Para ser compatível com async no exemplo
Meu exemplo
Neste exemplo, precisamos executar uma função para cada URL na matriz (como, talvez, uma solicitação de API). Aqui isso é chamado
fetchData()
. Se tivéssemos uma matriz de milhares de itens para processar, a simultaneidade definitivamente seria útil para economizar recursos de CPU e memória.const pLimit = require('p-limit'); // Example Concurrency of 3 promise at once const limit = pLimit(3); let urls = [ "http://www.exampleone.com/", "http://www.exampletwo.com/", "http://www.examplethree.com/", "http://www.examplefour.com/", ] // Create an array of our promises using map (fetchData() returns a promise) let promises = urls.map(url => { // wrap the function we are calling in the limit function we defined above return limit(() => fetchData(url)); }); (async () => { // Only three promises are run at once (as defined above) const result = await Promise.all(promises); console.log(result); })();
O resultado do log do console é uma matriz dos dados de resposta das promessas resolvidas.
fonte
Usando
Array.prototype.splice
while (funcs.length) { // 100 at at time await Promise.all( funcs.splice(0, 100).map(f => f()) ) }
fonte
arr.splice(-100)
se a dose do pedido não for diferente, talvez você possa inverter a matriz: PSe você sabe como os iteradores funcionam e como são consumidos, não precisa de nenhuma biblioteca extra, pois pode se tornar muito fácil construir sua própria simultaneidade. Deixe-me demonstrar:
/* [Symbol.iterator]() is equivalent to .values() const iterator = [1,2,3][Symbol.iterator]() */ const iterator = [1,2,3].values() // loop over all items with for..of for (const x of iterator) { console.log('x:', x) // notices how this loop continues the same iterator // and consumes the rest of the iterator, making the // outer loop not logging any more x's for (const y of iterator) { console.log('y:', y) } }
Podemos usar o mesmo iterador e compartilhá-lo entre os trabalhadores.
Se você tivesse usado em
.entries()
vez de.values()
você teria obtido um array 2D com o[[index, value]]
qual irei demonstrar abaixo com uma simultaneidade de 2const sleep = t => new Promise(rs => setTimeout(rs, t)) async function doWork(iterator) { for (let [index, item] of iterator) { await sleep(1000) console.log(index + ': ' + item) } } const iterator = Array.from('abcdefghij').entries() const workers = new Array(2).fill(iterator).map(doWork) // ^--- starts two workers sharing the same iterator Promise.allSettled(workers).then(() => console.log('done'))
A vantagem disso é que você pode ter uma função de gerador em vez de ter tudo pronto de uma vez.
Observação: a diferença disso em comparação com o pool assíncrono de exemplo é que ele gera dois trabalhadores, portanto, se um trabalhador lançar um erro por algum motivo no índice 5, ele não impedirá o outro trabalhador de fazer o resto. Então você vai de 2 simultaneidade para 1. (então não vai parar por aí) Então meu conselho é que você pegue todos os erros dentro da
doWork
funçãofonte
O Promise.map do bluebird pode ter uma opção de simultaneidade para controlar quantas promessas devem ser executadas em paralelo. Às vezes é mais fácil do que
.all
porque você não precisa criar o array de promessa.const Promise = require('bluebird') function getCounts() { return Promise.map(users, user => { return new Promise(resolve => { remoteServer.getCount(user) // makes an HTTP request .then(() => { /* snip */ resolve(); }); }); }, {concurrency: 10}); // <---- at most 10 http requests at a time }
fonte
Em vez de usar promessas para limitar as solicitações http, use o http.Agent.maxSockets integrado do nó . Isso remove a necessidade de usar uma biblioteca ou escrever seu próprio código de pool e tem a vantagem adicional de mais controle sobre o que você está limitando.
Por exemplo:
var http = require('http'); var agent = new http.Agent({maxSockets: 5}); // 5 concurrent connections per origin var request = http.request({..., agent: agent}, ...);
Se estiver fazendo várias solicitações para a mesma origem, também pode ser benéfico definir
keepAlive
como verdadeiro (consulte os documentos acima para obter mais informações).fonte
Eu sugiro a biblioteca async-pool: https://github.com/rxaviers/async-pool
npm install tiny-async-pool
Descrição:
Uso:
const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i)); await asyncPool(2, [1000, 5000, 3000, 2000], timeout); // Call iterator (i = 1000) // Call iterator (i = 5000) // Pool limit of 2 reached, wait for the quicker one to complete... // 1000 finishes // Call iterator (i = 3000) // Pool limit of 2 reached, wait for the quicker one to complete... // 3000 finishes // Call iterator (i = 2000) // Itaration is complete, wait until running ones complete... // 5000 finishes // 2000 finishes // Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.
fonte
Isso pode ser resolvido usando recursão.
A ideia é que inicialmente você envie o número máximo permitido de solicitações e cada uma dessas solicitações deve continuar a se enviar recursivamente após a sua conclusão.
function batchFetch(urls, concurrentRequestsLimit) { return new Promise(resolve => { var documents = []; var index = 0; function recursiveFetch() { if (index === urls.length) { return; } fetch(urls[index++]).then(r => { documents.push(r.text()); if (documents.length === urls.length) { resolve(documents); } else { recursiveFetch(); } }); } for (var i = 0; i < concurrentRequestsLimit; i++) { recursiveFetch(); } }); } var sources = [ 'http://www.example_1.com/', 'http://www.example_2.com/', 'http://www.example_3.com/', ... 'http://www.example_100.com/' ]; batchFetch(sources, 5).then(documents => { console.log(documents); });
fonte
Aqui está minha solução ES7 para copiar e colar amigável e recurso completo
Promise.all()
/map()
alternativo, com um limite de simultaneidade.Semelhante a
Promise.all()
ele mantém a ordem de devolução, bem como um fallback para valores de retorno não prometidos.Também incluí uma comparação das diferentes implementações, pois ela ilustra alguns aspectos que algumas das outras soluções deixaram de lado.
Uso
const asyncFn = delay => new Promise(resolve => setTimeout(() => resolve(), delay)); const args = [30, 20, 15, 10]; await asyncPool(args, arg => asyncFn(arg), 4); // concurrency limit of 4
Implementação
async function asyncBatch(args, fn, limit = 8) { // Copy arguments to avoid side effects args = [...args]; const outs = []; while (args.length) { const batch = args.splice(0, limit); const out = await Promise.all(batch.map(fn)); outs.push(...out); } return outs; } async function asyncPool(args, fn, limit = 8) { return new Promise((resolve) => { // Copy arguments to avoid side effect, reverse queue as // pop is faster than shift const argQueue = [...args].reverse(); let count = 0; const outs = []; const pollNext = () => { if (argQueue.length === 0 && count === 0) { resolve(outs); } else { while (count < limit && argQueue.length) { const index = args.length - argQueue.length; const arg = argQueue.pop(); count += 1; const out = fn(arg); const processOut = (out, index) => { outs[index] = out; count -= 1; pollNext(); }; if (typeof out === 'object' && out.then) { out.then(out => processOut(out, index)); } else { processOut(out, index); } } } }; pollNext(); }); }
Comparação
// A simple async function that returns after the given delay // and prints its value to allow us to determine the response order const asyncFn = delay => new Promise(resolve => setTimeout(() => { console.log(delay); resolve(delay); }, delay)); // List of arguments to the asyncFn function const args = [30, 20, 15, 10]; // As a comparison of the different implementations, a low concurrency // limit of 2 is used in order to highlight the performance differences. // If a limit greater than or equal to args.length is used the results // would be identical. // Vanilla Promise.all/map combo const out1 = await Promise.all(args.map(arg => asyncFn(arg))); // prints: 10, 15, 20, 30 // total time: 30ms // Pooled implementation const out2 = await asyncPool(args, arg => asyncFn(arg), 2); // prints: 20, 30, 15, 10 // total time: 40ms // Batched implementation const out3 = await asyncBatch(args, arg => asyncFn(arg), 2); // prints: 20, 30, 20, 30 // total time: 45ms console.log(out1, out2, out3); // prints: [30, 20, 15, 10] x 3 // Conclusion: Execution order and performance is different, // but return order is still identical
Conclusão
asyncPool()
deve ser a melhor solução, pois permite que novas solicitações sejam iniciadas assim que as anteriores forem concluídas.asyncBatch()
está incluído como uma comparação porque sua implementação é mais simples de entender, mas deve ter um desempenho mais lento, pois todas as solicitações no mesmo lote precisam terminar para iniciar o próximo lote.Neste exemplo inventado, a baunilha não limitada
Promise.all()
é obviamente o mais rápido, enquanto os outros poderiam ter um desempenho mais desejável em um cenário de congestionamento do mundo real.Atualizar
A biblioteca de pool assíncrono que outros já sugeriram é provavelmente uma alternativa melhor para minha implementação, pois funciona quase de forma idêntica e tem uma implementação mais concisa com um uso inteligente de Promise.race (): https://github.com/rxaviers/ async-pool / blob / master / lib / es7.js
Espero que minha resposta ainda possa servir a um valor educacional.
fonte
Aqui vai um exemplo básico de streaming e 'p-limit'. Ele transmite o fluxo de leitura de http para mongo db.
const stream = require('stream'); const util = require('util'); const pLimit = require('p-limit'); const es = require('event-stream'); const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB; const pipeline = util.promisify(stream.pipeline) const outputDBConfig = { dbURL: 'yr-db-url', collection: 'some-collection' }; const limit = pLimit(3); async yrAsyncStreamingFunction(readStream) => { const mongoWriteStream = streamToMongoDB(outputDBConfig); const mapperStream = es.map((data, done) => { let someDataPromise = limit(() => yr_async_call_to_somewhere()) someDataPromise.then( function handleResolve(someData) { data.someData = someData; done(null, data); }, function handleError(error) { done(error) } ); }) await pipeline( readStream, JSONStream.parse('*'), mapperStream, mongoWriteStream ); }
fonte
Tentei fazer com que alguns exemplos mostrados funcionassem para meu código, mas como isso era apenas para um script de importação e não para um código de produção, usar o pacote npm batch-promises foi certamente o caminho mais fácil para mim
NOTA: Requer tempo de execução para suportar Promise ou para ser polyfilled.
Api batchPromises (int: batchSize, array: Collection, i => Promise: Iteratee) A Promise: Iteratee será chamada após cada lote.
Usar:
batch-promises Easily batch promises NOTE: Requires runtime to support Promise or to be polyfilled. Api batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee) The Promise: Iteratee will be called after each batch. Use: import batchPromises from 'batch-promises'; batchPromises(2, [1,2,3,4,5], i => new Promise((resolve, reject) => { // The iteratee will fire after each batch resulting in the following behaviour: // @ 100ms resolve items 1 and 2 (first batch of 2) // @ 200ms resolve items 3 and 4 (second batch of 2) // @ 300ms resolve remaining item 5 (last remaining batch) setTimeout(() => { resolve(i); }, 100); })) .then(results => { console.log(results); // [1,2,3,4,5] });
fonte
A recursão é a resposta se você não quiser usar bibliotecas externas
downloadAll(someArrayWithData){ var self = this; var tracker = function(next){ return self.someExpensiveRequest(someArrayWithData[next]) .then(function(){ next++;//This updates the next in the tracker function parameter if(next < someArrayWithData.length){//Did I finish processing all my data? return tracker(next);//Go to the next promise } }); } return tracker(0); }
fonte
Isso é o que eu fiz usando
Promise.race
, dentro do meu código aquiconst identifyTransactions = async function() { let promises = [] let concurrency = 0 for (let tx of this.transactions) { if (concurrency > 4) await Promise.race(promises).then(r => { promises = []; concurrency = 0 }) promises.push(tx.identifyTransaction()) concurrency++ } if (promises.length > 0) await Promise.race(promises) //resolve the rest }
Se você quiser ver um exemplo: https://jsfiddle.net/thecodermarcelo/av2tp83o/5/
fonte
Promise.race
Sempre que possível, procuro desenvolver esse tipo de coisa por conta própria, em vez de ir para uma biblioteca. Você acaba aprendendo muitos conceitos que pareciam assustadores antes.
O que vocês acham dessa tentativa:
(eu pensei muito sobre isso e acho que está funcionando, mas indique se não estiver ou se há algo fundamentalmente errado)
class Pool{ constructor(maxAsync) { this.maxAsync = maxAsync; this.asyncOperationsQueue = []; this.currentAsyncOperations = 0 } runAnother() { if (this.asyncOperationsQueue.length > 0 && this.currentAsyncOperations < this.maxAsync) { this.currentAsyncOperations += 1; this.asyncOperationsQueue.pop()() .then(() => { this.currentAsyncOperations -= 1; this.runAnother() }, () => { this.currentAsyncOperations -= 1; this.runAnother() }) } } add(f){ // the argument f is a function of signature () => Promise this.runAnother(); return new Promise((resolve, reject) => { this.asyncOperationsQueue.push( () => f().then(resolve).catch(reject) ) }) } } //####################################################### // TESTS //####################################################### function dbCall(id, timeout, fail) { return new Promise((resolve, reject) => { setTimeout(() => { if (fail) { reject(`Error for id ${id}`); } else { resolve(id); } }, timeout) } ) } const dbQuery1 = () => dbCall(1, 5000, false); const dbQuery2 = () => dbCall(2, 5000, false); const dbQuery3 = () => dbCall(3, 5000, false); const dbQuery4 = () => dbCall(4, 5000, true); const dbQuery5 = () => dbCall(5, 5000, false); const cappedPool = new Pool(2); const dbQuery1Res = cappedPool.add(dbQuery1).catch(i => i).then(i => console.log(`Resolved: ${i}`)) const dbQuery2Res = cappedPool.add(dbQuery2).catch(i => i).then(i => console.log(`Resolved: ${i}`)) const dbQuery3Res = cappedPool.add(dbQuery3).catch(i => i).then(i => console.log(`Resolved: ${i}`)) const dbQuery4Res = cappedPool.add(dbQuery4).catch(i => i).then(i => console.log(`Resolved: ${i}`)) const dbQuery5Res = cappedPool.add(dbQuery5).catch(i => i).then(i => console.log(`Resolved: ${i}`))
Esta abordagem fornece uma boa API, semelhante aos conjuntos de threads em scala / java.
Depois de criar uma instância do pool com
const cappedPool = new Pool(2)
, você faz promessas a ela simplesmentecappedPool.add(() => myPromise)
.Obviamente, devemos garantir que a promessa não comece imediatamente e é por isso que devemos "fornecê-la preguiçosamente" com a ajuda da função.
Mais importante ainda, observe que o resultado do método
add
é uma promessa que será cumprida / resolvida com o valor de sua promessa original ! Isso torna o uso muito intuitivo.const resultPromise = cappedPool.add( () => dbCall(...)) resultPromise .then( actualResult => { // Do something with the result form the DB } )
fonte
Infelizmente, não há como fazer isso com Promise.all nativo, então você precisa ser criativo.
Esta é a maneira mais rápida e concisa que consegui encontrar sem usar nenhuma biblioteca externa.
Ele faz uso de um recurso javascript mais recente denominado iterador. O iterador basicamente controla quais itens foram processados e quais não foram.
Para usá-lo no código, você cria uma matriz de funções assíncronas. Cada função assíncrona pede ao mesmo iterador o próximo item que precisa ser processado. Cada função processa seu próprio item de forma assíncrona e, quando concluído, solicita um novo item ao iterador. Quando o iterador fica sem itens, todas as funções são concluídas.
Obrigado a @Endless pela inspiração.
var items = [ "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", ]; var concurrency = 5 Array(concurrency).fill(items.entries()).map(async (cursor) => { for(let [index, url] of cursor){ console.log("getting url is ", index, url); // run your async task instead of this next line var text = await fetch(url).then(res => res.text()); console.log("text is", text.slice(0,20)); } })
fonte
Tantas boas soluções. Comecei com a solução elegante postada por @Endless e acabei com este pequeno método de extensão que não usa nenhuma biblioteca externa nem é executado em lotes (embora presuma que você tenha recursos como async, etc):
Promise.allWithLimit = async (taskList, limit = 5) => { const iterator = taskList.entries(); let results = new Array(taskList.length); let workerThreads = new Array(limit).fill(0).map(() => new Promise(async (resolve, reject) => { try { let entry = iterator.next(); while (!entry.done) { let [index, promise] = entry.value; try { results[index] = await promise; entry = iterator.next(); } catch (err) { results[index] = err; } } // No more work to do resolve(true); } catch (err) { // This worker is dead reject(err); } })); await Promise.all(workerThreads); return results; };
Promise.allWithLimit = async (taskList, limit = 5) => { const iterator = taskList.entries(); let results = new Array(taskList.length); let workerThreads = new Array(limit).fill(0).map(() => new Promise(async (resolve, reject) => { try { let entry = iterator.next(); while (!entry.done) { let [index, promise] = entry.value; try { results[index] = await promise; entry = iterator.next(); } catch (err) { results[index] = err; } } // No more work to do resolve(true); } catch (err) { // This worker is dead reject(err); } })); await Promise.all(workerThreads); return results; }; const demoTasks = new Array(10).fill(0).map((v,i) => new Promise(resolve => { let n = (i + 1) * 5; setTimeout(() => { console.log(`Did nothing for ${n} seconds`); resolve(n); }, n * 1000); })); var results = Promise.allWithLimit(demoTasks);
fonte
expandindo a resposta postada por @deceleratedcaviar, criei uma função de utilidade 'batch' que leva como argumento: array de valores, limite de simultaneidade e função de processamento. Sim, eu percebo que usar Promise.all dessa forma é mais parecido com o processamento em lote do que com a verdadeira simultaneidade, mas se o objetivo é limitar o número excessivo de chamadas HTTP de uma vez, eu uso essa abordagem devido à sua simplicidade e não há necessidade de biblioteca externa .
async function batch(o) { let arr = o.arr let resp = [] while (arr.length) { let subset = arr.splice(0, o.limit) let results = await Promise.all(subset.map(o.process)) resp.push(results) } return [].concat.apply([], resp) } let arr = [] for (let i = 0; i < 250; i++) { arr.push(i) } async function calc(val) { return val * 100 } (async () => { let resp = await batch({ arr: arr, limit: 100, process: calc }) console.log(resp) })();
fonte