Qual é a melhor maneira de limitar a simultaneidade ao usar Promise.all () do ES6?

108

Eu tenho um código que está iterando em uma lista que foi consultada em um banco de dados e fazendo uma solicitação HTTP para cada elemento dessa lista. Essa lista às vezes pode ser um número razoavelmente grande (na casa dos milhares), e eu gostaria de ter certeza de que não estou acessando um servidor da web com milhares de solicitações HTTP simultâneas.

Uma versão abreviada deste código atualmente se parece com isto ...

function getCounts() {
  return users.map(user => {
    return new Promise(resolve => {
      remoteServer.getCount(user) // makes an HTTP request
      .then(() => {
        /* snip */
        resolve();
      });
    });
  });
}

Promise.all(getCounts()).then(() => { /* snip */});

Este código está sendo executado no Nó 4.3.2. Para reiterar, pode Promise.allser administrado de forma que apenas um certo número de Promessas estejam em andamento em um determinado momento?

Chris
fonte
3
Não se esqueça de que Promise.allgerencia a progressão das promessas - as promessas fazem isso sozinhas, Promise.allapenas esperam por elas.
Bergi
1
Evite o Promiseantipadrão do construtor !
Bergi

Respostas:

53

Observe que Promise.all()não aciona as promessas para iniciar seu trabalho, mas criar a promessa em si sim.

Com isso em mente, uma solução seria verificar sempre que uma promessa foi resolvida se uma nova promessa deve ser iniciada ou se você já está no limite.

No entanto, realmente não há necessidade de reinventar a roda aqui. Uma biblioteca que você pode usar para esse propósito ées6-promise-pool . Dos exemplos deles:

// On the Web, leave out this line and use the script tag above instead. 
var PromisePool = require('es6-promise-pool')

var promiseProducer = function () {
  // Your code goes here. 
  // If there is work left to be done, return the next work item as a promise. 
  // Otherwise, return null to indicate that all promises have been created. 
  // Scroll down for an example. 
}

// The number of promises to process simultaneously. 
var concurrency = 3

// Create a pool. 
var pool = new PromisePool(promiseProducer, concurrency)

// Start the pool. 
var poolPromise = pool.start()

// Wait for the pool to settle. 
poolPromise.then(function () {
  console.log('All promises fulfilled')
}, function (error) {
  console.log('Some promise rejected: ' + error.message)
})
Timo
fonte
26
É uma pena que es6-Promessa-pool reinventa Promise em vez de usá-la. Em vez disso, sugiro esta solução concisa (se você já estiver usando ES6 ou ES7) github.com/rxaviers/async-pool
Rafael Xavier
3
Deu uma olhada em ambos, async-pool parece muito melhor! Mais direto e mais leve.
Infinito de
3
Também descobri que p-limit é a implementação mais simples. Veja meu exemplo abaixo. stackoverflow.com/a/52262024/8177355
Matthew Rideout
2
Eu acho que tiny-asyc-pool é uma solução muito melhor, não intrusiva e bastante natural para limitar a simultaneidade de promessas.
Sunny Tambi
77

P-Limit

Eu comparei a limitação de concorrência de promessa com um script personalizado, bluebird, es6-promessa-pool e p-limit. Eu acredito que p-limit tem a implementação mais simples e enxuta para essa necessidade. Veja sua documentação .

Requisitos

Para ser compatível com async no exemplo

Meu exemplo

Neste exemplo, precisamos executar uma função para cada URL na matriz (como, talvez, uma solicitação de API). Aqui isso é chamado fetchData(). Se tivéssemos uma matriz de milhares de itens para processar, a simultaneidade definitivamente seria útil para economizar recursos de CPU e memória.

const pLimit = require('p-limit');

// Example Concurrency of 3 promise at once
const limit = pLimit(3);

let urls = [
    "http://www.exampleone.com/",
    "http://www.exampletwo.com/",
    "http://www.examplethree.com/",
    "http://www.examplefour.com/",
]

// Create an array of our promises using map (fetchData() returns a promise)
let promises = urls.map(url => {

    // wrap the function we are calling in the limit function we defined above
    return limit(() => fetchData(url));
});

(async () => {
    // Only three promises are run at once (as defined above)
    const result = await Promise.all(promises);
    console.log(result);
})();

O resultado do log do console é uma matriz dos dados de resposta das promessas resolvidas.

Matthew Rideout
fonte
4
Obrigado por este! Este é muito mais simples
João
3
Esta foi de longe a melhor biblioteca que vi para limitar solicitações simultâneas. E ótimo exemplo, obrigado!
Chris Livdahl de
3
Obrigado por fazer a comparação. Você comparou github.com/rxaviers/async-pool ?
ahong
1
Fácil de usar, ótima escolha.
drmrbrewer
1
além disso, tem cerca de 20 milhões de downloads semanais em npm por semana contra cerca de 200-100k para outras bibliotecas mencionadas em outras respostas.
vir
25

Usando Array.prototype.splice

while (funcs.length) {
  // 100 at at time
  await Promise.all( funcs.splice(0, 100).map(f => f()) )
}
caviar desacelerado
fonte
3
Esta é uma solução subestimada. Ame a simplicidade.
Brannon de
12
Isso executa funções em lotes em vez de pool, onde uma função é chamada imediatamente quando a outra termina.
cltsang
Adorei essa solução!
prasun de
levou um segundo para entender o que está fazendo com a falta de mais contexto em torno dele, como ser um lote em vez de um pool, por exemplo. Você está reordenando a matriz toda vez que unir do início ou no meio. (o navegador deve reindexar todos os itens) uma alternativa melhor de desempenho teórico é tirar as coisas do final, em vez de arr.splice(-100)se a dose do pedido não for diferente, talvez você possa inverter a matriz: P
Sem fim
2
Muito útil para execução em lotes. Observação: o próximo lote não começará até que o lote atual esteja 100% concluído.
Casey Dwayne
24

Se você sabe como os iteradores funcionam e como são consumidos, não precisa de nenhuma biblioteca extra, pois pode se tornar muito fácil construir sua própria simultaneidade. Deixe-me demonstrar:

/* [Symbol.iterator]() is equivalent to .values()
const iterator = [1,2,3][Symbol.iterator]() */
const iterator = [1,2,3].values()


// loop over all items with for..of
for (const x of iterator) {
  console.log('x:', x)
  
  // notices how this loop continues the same iterator
  // and consumes the rest of the iterator, making the
  // outer loop not logging any more x's
  for (const y of iterator) {
    console.log('y:', y)
  }
}

Podemos usar o mesmo iterador e compartilhá-lo entre os trabalhadores.

Se você tivesse usado em .entries()vez de .values()você teria obtido um array 2D com o [[index, value]]qual irei demonstrar abaixo com uma simultaneidade de 2

const sleep = t => new Promise(rs => setTimeout(rs, t))

async function doWork(iterator) {
  for (let [index, item] of iterator) {
    await sleep(1000)
    console.log(index + ': ' + item)
  }
}

const iterator = Array.from('abcdefghij').entries()
const workers = new Array(2).fill(iterator).map(doWork)
//    ^--- starts two workers sharing the same iterator

Promise.allSettled(workers).then(() => console.log('done'))

A vantagem disso é que você pode ter uma função de gerador em vez de ter tudo pronto de uma vez.


Observação: a diferença disso em comparação com o pool assíncrono de exemplo é que ele gera dois trabalhadores, portanto, se um trabalhador lançar um erro por algum motivo no índice 5, ele não impedirá o outro trabalhador de fazer o resto. Então você vai de 2 simultaneidade para 1. (então não vai parar por aí) Então meu conselho é que você pegue todos os erros dentro da doWorkfunção

Sem fim
fonte
Isso é incrível! Obrigado Endless!
user3413723
Esta é definitivamente uma abordagem legal! Apenas certifique-se de que sua simultaneidade não exceda o comprimento de sua lista de tarefas (se você se preocupa com os resultados de qualquer maneira), pois você pode acabar com extras!
Kris Oye,
Algo que pode ser mais legal mais tarde é quando o Streams obtém o suporte Readable.from (iterador) . O Chrome já tornou os streams transferíveis . portanto, você poderia criar fluxos legíveis e enviá-los para um web workers, e todos eles acabariam usando o mesmo iterador subjacente.
Fim de
16

O Promise.map do bluebird pode ter uma opção de simultaneidade para controlar quantas promessas devem ser executadas em paralelo. Às vezes é mais fácil do que .allporque você não precisa criar o array de promessa.

const Promise = require('bluebird')

function getCounts() {
  return Promise.map(users, user => {
    return new Promise(resolve => {
      remoteServer.getCount(user) // makes an HTTP request
      .then(() => {
        /* snip */
        resolve();
       });
    });
  }, {concurrency: 10}); // <---- at most 10 http requests at a time
}
Jingshao Chen
fonte
bluebird é difícil se você precisar de promessas mais rápidas e lixo extra de ~ 18kb se você usá-lo apenas para uma coisa;)
Interminável
2
Tudo depende da importância de uma coisa para você e se há outra maneira melhor e mais rápida / fácil. Uma troca típica. Vou escolher facilidade de uso e função em alguns kb, mas YMMV.
Jingshao Chen
13

Em vez de usar promessas para limitar as solicitações http, use o http.Agent.maxSockets integrado do nó . Isso remove a necessidade de usar uma biblioteca ou escrever seu próprio código de pool e tem a vantagem adicional de mais controle sobre o que você está limitando.

agent.maxSockets

Por padrão, definido como Infinity. Determina quantos soquetes simultâneos o agente pode ter abertos por origem. Origem é uma combinação de 'host: porta' ou 'host: porta: localAddress'.

Por exemplo:

var http = require('http');
var agent = new http.Agent({maxSockets: 5}); // 5 concurrent connections per origin
var request = http.request({..., agent: agent}, ...);

Se estiver fazendo várias solicitações para a mesma origem, também pode ser benéfico definir keepAlivecomo verdadeiro (consulte os documentos acima para obter mais informações).

tcooc
fonte
12
Ainda assim, criar milhares de fechamentos imediatamente e soquetes de pool não parece ser muito eficiente?
Bergi
3

Eu sugiro a biblioteca async-pool: https://github.com/rxaviers/async-pool

npm install tiny-async-pool

Descrição:

Execute várias funções de retorno de promessa e assíncronas com simultaneidade limitada usando ES6 / ES7 nativo

asyncPool executa várias funções de retorno de promessa e assíncronas em um pool de simultaneidade limitado. Ele rejeita imediatamente assim que uma das promessas é rejeitada. Resolve quando todas as promessas são concluídas. Ele chama a função iteradora assim que possível (sob o limite de simultaneidade).

Uso:

const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
// Call iterator (i = 1000)
// Call iterator (i = 5000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 1000 finishes
// Call iterator (i = 3000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 3000 finishes
// Call iterator (i = 2000)
// Itaration is complete, wait until running ones complete...
// 5000 finishes
// 2000 finishes
// Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.
Venryx
fonte
1
Funciona para mim. Obrigado. Esta é uma ótima biblioteca.
Sunny Tambi
2

Isso pode ser resolvido usando recursão.

A ideia é que inicialmente você envie o número máximo permitido de solicitações e cada uma dessas solicitações deve continuar a se enviar recursivamente após a sua conclusão.

function batchFetch(urls, concurrentRequestsLimit) {
    return new Promise(resolve => {
        var documents = [];
        var index = 0;

        function recursiveFetch() {
            if (index === urls.length) {
                return;
            }
            fetch(urls[index++]).then(r => {
                documents.push(r.text());
                if (documents.length === urls.length) {
                    resolve(documents);
                } else {
                    recursiveFetch();
                }
            });
        }

        for (var i = 0; i < concurrentRequestsLimit; i++) {
            recursiveFetch();
        }
    });
}

var sources = [
    'http://www.example_1.com/',
    'http://www.example_2.com/',
    'http://www.example_3.com/',
    ...
    'http://www.example_100.com/'
];
batchFetch(sources, 5).then(documents => {
   console.log(documents);
});
Anton Fil
fonte
2

Aqui está minha solução ES7 para copiar e colar amigável e recurso completo Promise.all()/ map()alternativo, com um limite de simultaneidade.

Semelhante a Promise.all()ele mantém a ordem de devolução, bem como um fallback para valores de retorno não prometidos.

Também incluí uma comparação das diferentes implementações, pois ela ilustra alguns aspectos que algumas das outras soluções deixaram de lado.

Uso

const asyncFn = delay => new Promise(resolve => setTimeout(() => resolve(), delay));
const args = [30, 20, 15, 10];
await asyncPool(args, arg => asyncFn(arg), 4); // concurrency limit of 4

Implementação

async function asyncBatch(args, fn, limit = 8) {
  // Copy arguments to avoid side effects
  args = [...args];
  const outs = [];
  while (args.length) {
    const batch = args.splice(0, limit);
    const out = await Promise.all(batch.map(fn));
    outs.push(...out);
  }
  return outs;
}

async function asyncPool(args, fn, limit = 8) {
  return new Promise((resolve) => {
    // Copy arguments to avoid side effect, reverse queue as
    // pop is faster than shift
    const argQueue = [...args].reverse();
    let count = 0;
    const outs = [];
    const pollNext = () => {
      if (argQueue.length === 0 && count === 0) {
        resolve(outs);
      } else {
        while (count < limit && argQueue.length) {
          const index = args.length - argQueue.length;
          const arg = argQueue.pop();
          count += 1;
          const out = fn(arg);
          const processOut = (out, index) => {
            outs[index] = out;
            count -= 1;
            pollNext();
          };
          if (typeof out === 'object' && out.then) {
            out.then(out => processOut(out, index));
          } else {
            processOut(out, index);
          }
        }
      }
    };
    pollNext();
  });
}

Comparação

// A simple async function that returns after the given delay
// and prints its value to allow us to determine the response order
const asyncFn = delay => new Promise(resolve => setTimeout(() => {
  console.log(delay);
  resolve(delay);
}, delay));

// List of arguments to the asyncFn function
const args = [30, 20, 15, 10];

// As a comparison of the different implementations, a low concurrency
// limit of 2 is used in order to highlight the performance differences.
// If a limit greater than or equal to args.length is used the results
// would be identical.

// Vanilla Promise.all/map combo
const out1 = await Promise.all(args.map(arg => asyncFn(arg)));
// prints: 10, 15, 20, 30
// total time: 30ms

// Pooled implementation
const out2 = await asyncPool(args, arg => asyncFn(arg), 2);
// prints: 20, 30, 15, 10
// total time: 40ms

// Batched implementation
const out3 = await asyncBatch(args, arg => asyncFn(arg), 2);
// prints: 20, 30, 20, 30
// total time: 45ms

console.log(out1, out2, out3); // prints: [30, 20, 15, 10] x 3

// Conclusion: Execution order and performance is different,
// but return order is still identical

Conclusão

asyncPool() deve ser a melhor solução, pois permite que novas solicitações sejam iniciadas assim que as anteriores forem concluídas.

asyncBatch() está incluído como uma comparação porque sua implementação é mais simples de entender, mas deve ter um desempenho mais lento, pois todas as solicitações no mesmo lote precisam terminar para iniciar o próximo lote.

Neste exemplo inventado, a baunilha não limitada Promise.all() é obviamente o mais rápido, enquanto os outros poderiam ter um desempenho mais desejável em um cenário de congestionamento do mundo real.

Atualizar

A biblioteca de pool assíncrono que outros já sugeriram é provavelmente uma alternativa melhor para minha implementação, pois funciona quase de forma idêntica e tem uma implementação mais concisa com um uso inteligente de Promise.race (): https://github.com/rxaviers/ async-pool / blob / master / lib / es7.js

Espero que minha resposta ainda possa servir a um valor educacional.

Adelost
fonte
1

Aqui vai um exemplo básico de streaming e 'p-limit'. Ele transmite o fluxo de leitura de http para mongo db.

const stream = require('stream');
const util = require('util');
const pLimit = require('p-limit');
const es = require('event-stream');
const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB;


const pipeline = util.promisify(stream.pipeline)

const outputDBConfig = {
    dbURL: 'yr-db-url',
    collection: 'some-collection'
};
const limit = pLimit(3);

async yrAsyncStreamingFunction(readStream) => {
        const mongoWriteStream = streamToMongoDB(outputDBConfig);
        const mapperStream = es.map((data, done) => {
                let someDataPromise = limit(() => yr_async_call_to_somewhere())

                    someDataPromise.then(
                        function handleResolve(someData) {

                            data.someData = someData;    
                            done(null, data);
                        },
                        function handleError(error) {
                            done(error)
                        }
                    );
                })

            await pipeline(
                readStream,
                JSONStream.parse('*'),
                mapperStream,
                mongoWriteStream
            );
        }
gosuer1921
fonte
0

Tentei fazer com que alguns exemplos mostrados funcionassem para meu código, mas como isso era apenas para um script de importação e não para um código de produção, usar o pacote npm batch-promises foi certamente o caminho mais fácil para mim

NOTA: Requer tempo de execução para suportar Promise ou para ser polyfilled.

Api batchPromises (int: batchSize, array: Collection, i => Promise: Iteratee) A Promise: Iteratee será chamada após cada lote.

Usar:

batch-promises
Easily batch promises

NOTE: Requires runtime to support Promise or to be polyfilled.

Api
batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee)
The Promise: Iteratee will be called after each batch.

Use:
import batchPromises from 'batch-promises';
 
batchPromises(2, [1,2,3,4,5], i => new Promise((resolve, reject) => {
 
  // The iteratee will fire after each batch resulting in the following behaviour:
  // @ 100ms resolve items 1 and 2 (first batch of 2)
  // @ 200ms resolve items 3 and 4 (second batch of 2)
  // @ 300ms resolve remaining item 5 (last remaining batch)
  setTimeout(() => {
    resolve(i);
  }, 100);
}))
.then(results => {
  console.log(results); // [1,2,3,4,5]
});

Agusti Fernandez Pardo
fonte
0

A recursão é a resposta se você não quiser usar bibliotecas externas

downloadAll(someArrayWithData){
  var self = this;

  var tracker = function(next){
    return self.someExpensiveRequest(someArrayWithData[next])
    .then(function(){
      next++;//This updates the next in the tracker function parameter
      if(next < someArrayWithData.length){//Did I finish processing all my data?
        return tracker(next);//Go to the next promise
      }
    });
  }

  return tracker(0); 
}
Juan
fonte
0

Isso é o que eu fiz usando Promise.race, dentro do meu código aqui

const identifyTransactions = async function() {
  let promises = []
  let concurrency = 0
  for (let tx of this.transactions) {
    if (concurrency > 4)
      await Promise.race(promises).then(r => { promises = []; concurrency = 0 })
    promises.push(tx.identifyTransaction())
    concurrency++
  }
  if (promises.length > 0)
    await Promise.race(promises) //resolve the rest
}

Se você quiser ver um exemplo: https://jsfiddle.net/thecodermarcelo/av2tp83o/5/

Alex
fonte
2
Eu não chamaria isso de simultaneidade ... Isso é mais parecido com a execução em lote ... Você faz 4 tarefas, espera que todas terminem e depois faz as próximas 4. se uma delas resolver antes, você ainda espera que as outras 3 terminem , o que você deve usar éPromise.race
infinito
0
  • @tcoocA resposta de foi bem legal. Não sabia sobre isso e aproveitarei isso no futuro.
  • Também gostei da resposta de @MatthewRideout , mas ele usa uma biblioteca externa !!

Sempre que possível, procuro desenvolver esse tipo de coisa por conta própria, em vez de ir para uma biblioteca. Você acaba aprendendo muitos conceitos que pareciam assustadores antes.

O que vocês acham dessa tentativa:
(eu pensei muito sobre isso e acho que está funcionando, mas indique se não estiver ou se há algo fundamentalmente errado)

 class Pool{
        constructor(maxAsync) {
            this.maxAsync = maxAsync;
            this.asyncOperationsQueue = [];
            this.currentAsyncOperations = 0
        }

        runAnother() {
            if (this.asyncOperationsQueue.length > 0 && this.currentAsyncOperations < this.maxAsync) {
                this.currentAsyncOperations += 1;
                this.asyncOperationsQueue.pop()()
                    .then(() => { this.currentAsyncOperations -= 1; this.runAnother() }, () => { this.currentAsyncOperations -= 1; this.runAnother() })
            }
        }

        add(f){  // the argument f is a function of signature () => Promise
            this.runAnother();
            return new Promise((resolve, reject) => {
                this.asyncOperationsQueue.push(
                    () => f().then(resolve).catch(reject)
                )
            })
        }
    }

//#######################################################
//                        TESTS
//#######################################################

function dbCall(id, timeout, fail) {
    return new Promise((resolve, reject) => {
        setTimeout(() => {
            if (fail) {
               reject(`Error for id ${id}`);
            } else {
                resolve(id);
            }
        }, timeout)
    }
    )
}


const dbQuery1 = () => dbCall(1, 5000, false);
const dbQuery2 = () => dbCall(2, 5000, false);
const dbQuery3 = () => dbCall(3, 5000, false);
const dbQuery4 = () => dbCall(4, 5000, true);
const dbQuery5 = () => dbCall(5, 5000, false);


const cappedPool = new Pool(2);

const dbQuery1Res = cappedPool.add(dbQuery1).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery2Res = cappedPool.add(dbQuery2).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery3Res = cappedPool.add(dbQuery3).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery4Res = cappedPool.add(dbQuery4).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery5Res = cappedPool.add(dbQuery5).catch(i => i).then(i => console.log(`Resolved: ${i}`))

Esta abordagem fornece uma boa API, semelhante aos conjuntos de threads em scala / java.
Depois de criar uma instância do pool com const cappedPool = new Pool(2), você faz promessas a ela simplesmente cappedPool.add(() => myPromise).
Obviamente, devemos garantir que a promessa não comece imediatamente e é por isso que devemos "fornecê-la preguiçosamente" com a ajuda da função.

Mais importante ainda, observe que o resultado do método add é uma promessa que será cumprida / resolvida com o valor de sua promessa original ! Isso torna o uso muito intuitivo.

const resultPromise = cappedPool.add( () => dbCall(...))
resultPromise
.then( actualResult => {
   // Do something with the result form the DB
  }
)
Carlos Teixeira
fonte
0

Infelizmente, não há como fazer isso com Promise.all nativo, então você precisa ser criativo.

Esta é a maneira mais rápida e concisa que consegui encontrar sem usar nenhuma biblioteca externa.

Ele faz uso de um recurso javascript mais recente denominado iterador. O iterador basicamente controla quais itens foram processados ​​e quais não foram.

Para usá-lo no código, você cria uma matriz de funções assíncronas. Cada função assíncrona pede ao mesmo iterador o próximo item que precisa ser processado. Cada função processa seu próprio item de forma assíncrona e, quando concluído, solicita um novo item ao iterador. Quando o iterador fica sem itens, todas as funções são concluídas.

Obrigado a @Endless pela inspiração.

var items = [
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
];

var concurrency = 5

Array(concurrency).fill(items.entries()).map(async (cursor) => {
    for(let [index, url] of cursor){
        console.log("getting url is ", index, url);
        // run your async task instead of this next line
        var text = await fetch(url).then(res => res.text());
        console.log("text is", text.slice(0,20));
    }
})

user3413723
fonte
Curioso para saber por que isso foi marcado. É muito semelhante ao que eu criei.
Kris Oye,
0

Tantas boas soluções. Comecei com a solução elegante postada por @Endless e acabei com este pequeno método de extensão que não usa nenhuma biblioteca externa nem é executado em lotes (embora presuma que você tenha recursos como async, etc):

Promise.allWithLimit = async (taskList, limit = 5) => {
    const iterator = taskList.entries();
    let results = new Array(taskList.length);
    let workerThreads = new Array(limit).fill(0).map(() => 
        new Promise(async (resolve, reject) => {
            try {
                let entry = iterator.next();
                while (!entry.done) {
                    let [index, promise] = entry.value;
                    try {
                        results[index] = await promise;
                        entry = iterator.next();
                    }
                    catch (err) {
                        results[index] = err;
                    }
                }
                // No more work to do
                resolve(true); 
            }
            catch (err) {
                // This worker is dead
                reject(err);
            }
        }));

    await Promise.all(workerThreads);
    return results;
};

    Promise.allWithLimit = async (taskList, limit = 5) => {
        const iterator = taskList.entries();
        let results = new Array(taskList.length);
        let workerThreads = new Array(limit).fill(0).map(() => 
            new Promise(async (resolve, reject) => {
                try {
                    let entry = iterator.next();
                    while (!entry.done) {
                        let [index, promise] = entry.value;
                        try {
                            results[index] = await promise;
                            entry = iterator.next();
                        }
                        catch (err) {
                            results[index] = err;
                        }
                    }
                    // No more work to do
                    resolve(true); 
                }
                catch (err) {
                    // This worker is dead
                    reject(err);
                }
            }));
    
        await Promise.all(workerThreads);
        return results;
    };

    const demoTasks = new Array(10).fill(0).map((v,i) => new Promise(resolve => {
       let n = (i + 1) * 5;
       setTimeout(() => {
          console.log(`Did nothing for ${n} seconds`);
          resolve(n);
       }, n * 1000);
    }));

    var results = Promise.allWithLimit(demoTasks);

Kris Oye
fonte
0

expandindo a resposta postada por @deceleratedcaviar, criei uma função de utilidade 'batch' que leva como argumento: array de valores, limite de simultaneidade e função de processamento. Sim, eu percebo que usar Promise.all dessa forma é mais parecido com o processamento em lote do que com a verdadeira simultaneidade, mas se o objetivo é limitar o número excessivo de chamadas HTTP de uma vez, eu uso essa abordagem devido à sua simplicidade e não há necessidade de biblioteca externa .

async function batch(o) {
  let arr = o.arr
  let resp = []
  while (arr.length) {
    let subset = arr.splice(0, o.limit)
    let results = await Promise.all(subset.map(o.process))
    resp.push(results)
  }
  return [].concat.apply([], resp)
}

let arr = []
for (let i = 0; i < 250; i++) { arr.push(i) }

async function calc(val) { return val * 100 }

(async () => {
  let resp = await batch({
    arr: arr,
    limit: 100,
    process: calc
  })
  console.log(resp)
})();

Eugene Blinn
fonte