Parallel.ForEach limita o número de threads ativos?

107

Dado este código:

var arrayStrings = new string[1000];
Parallel.ForEach<string>(arrayStrings, someString =>
{
    DoSomething(someString);
});

Todos os 1000 threads serão gerados quase simultaneamente?

Jader Dias
fonte

Respostas:

149

Não, não iniciará 1000 threads - sim, limitará o número de threads usados. As extensões paralelas usam um número apropriado de núcleos, com base em quantos você tem fisicamente e quantos já estão ocupados. Ele aloca trabalho para cada núcleo e, em seguida, usa uma técnica chamada roubo de trabalho para permitir que cada thread processe sua própria fila de forma eficiente e só precisa fazer qualquer acesso caro entre threads quando realmente for necessário.

Dê uma olhada no PFX Team Blog para muitas informações sobre como ele aloca o trabalho e todos os tipos de outros tópicos.

Observe que, em alguns casos, você também pode especificar o grau de paralelismo que deseja.

Jon Skeet
fonte
2
Eu estava usando Parallel.ForEach (FilePathArray, path => ... para ler cerca de 24.000 arquivos hoje à noite, criando um novo arquivo para cada arquivo que li. Código muito simples. Parece que até 6 threads foram suficientes para sobrecarregar o disco de 7200 RPM Eu estava lendo em 100% de utilização. Durante o período de algumas horas, observei a biblioteca Parallel girar em mais de 8.000 threads. Testei usando MaxDegreeOfParallelism e, com certeza, mais de 8.000 threads desapareceram. Eu testei várias vezes agora com o mesmo resultado.
Jake Drew
Ele poderia iniciar 1000 threads para algum degenerado 'DoSomething'. (Como no caso em que estou lidando atualmente com um problema no código de produção que falhou em definir um limite e gerou mais de 200 threads, quebrando assim o pool de conexão SQL .. Eu recomendo definir o DOP máximo para qualquer trabalho que não possa ser raciocinado trivialmente quase como sendo explicitamente vinculado à CPU.)
user2864740
Particionador - docs.microsoft.com/en-us/dotnet/api/…
rafidheen
28

Em uma única máquina de núcleo ... Paralelo.ForEach partições (pedaços) da coleção em que está trabalhando entre uma série de threads, mas esse número é calculado com base em um algoritmo que leva em conta e parece monitorar continuamente o trabalho feito pelo threads está alocando para o ForEach. Portanto, se a parte do corpo do ForEach chama funções de bloqueio / vinculação de E / S de longa execução que deixariam o thread esperando, o algoritmo gerará mais threads e reparticionará a coleção entre eles . Se os threads completam rapidamente e não bloqueiam nos threads de IO, por exemplo, como simplesmente calcular alguns números,o algoritmo aumentará (ou mesmo diminuirá) o número de threads até um ponto em que o algoritmo considera o melhor para o rendimento (tempo médio de conclusão de cada iteração) .

Basicamente, o pool de threads por trás de todas as várias funções da biblioteca Parallel, irá trabalhar em um número ótimo de threads a serem usados. O número de núcleos de processador físico constitui apenas parte da equação. NÃO existe uma relação simples de um para um entre o número de núcleos e o número de threads gerados.

Não considero a documentação sobre o cancelamento e o manuseio de threads de sincronização muito útil. Esperançosamente, a MS pode fornecer melhores exemplos no MSDN.

Não se esqueça, o código do corpo deve ser escrito para rodar em vários threads, junto com todas as considerações de segurança de thread usuais, o framework não abstrai esse fator ... ainda.

Microsoft Developer
fonte
1
"..se a parte do corpo do ForEach chama funções de bloqueio de longa execução que deixariam o thread em espera, o algoritmo gerará mais threads .." - Em casos degenerados, isso significa que pode haver tantos threads criados quanto permitido por ThreadPool.
user2864740
2
Você está certo, para IO ele pode alocar +100 threads enquanto eu me
depuro
5

Ele calcula um número ideal de threads com base no número de processadores / núcleos. Eles não irão gerar todos de uma vez.

Colin Mackay
fonte
5

Consulte O Parallel.For usa uma tarefa por iteração? para ter uma ideia de um "modelo mental" para usar. No entanto, o autor afirma que "No final do dia, é importante lembrar que os detalhes da implementação podem mudar a qualquer momento."

Kevin Hakanson
fonte
4

Ótima pergunta. Em seu exemplo, o nível de paralelização é muito baixo, mesmo em um processador quad core, mas com alguma espera, o nível de paralelização pode ficar muito alto.

// Max concurrency: 5
[Test]
public void Memory_Operations()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    Parallel.ForEach<string>(arrayStrings, someString =>
    {
        monitor.Add(monitor.Count);
        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

Agora veja o que acontece quando uma operação de espera é adicionada para simular uma solicitação HTTP.

// Max concurrency: 34
[Test]
public void Waiting_Operations()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    Parallel.ForEach<string>(arrayStrings, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(1000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

Não fiz nenhuma alteração ainda e o nível de simultaneidade / paralelização aumentou drasticamente. A simultaneidade pode ter seu limite aumentado com ParallelOptions.MaxDegreeOfParallelism.

// Max concurrency: 43
[Test]
public void Test()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(1000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

// Max concurrency: 391
[Test]
public void Test()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(100000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

Eu recomendo configuração ParallelOptions.MaxDegreeOfParallelism. Isso não aumentará necessariamente o número de threads em uso, mas garantirá que você inicie apenas um número razoável de threads, o que parece ser sua preocupação.

Por último, para responder à sua pergunta, não, você não fará com que todos os tópicos sejam iniciados de uma vez. Use Parallel.Invoke se você deseja invocar em paralelo perfeitamente, por exemplo, testar condições de corrida.

// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623368346
// 636462943623368346
// 636462943623373351
// 636462943623393364
// 636462943623393364
[Test]
public void Test()
{
    ConcurrentBag<string> monitor = new ConcurrentBag<string>();
    ConcurrentBag<string> monitorOut = new ConcurrentBag<string>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(DateTime.UtcNow.Ticks.ToString());
        monitor.TryTake(out string result);
        monitorOut.Add(result);
    });

    var startTimes = monitorOut.OrderBy(x => x.ToString()).ToList();
    Console.WriteLine(string.Join(Environment.NewLine, startTimes.Take(10)));
}
Timothy Gonzalez
fonte