O aninhamento aguarda em Parallel.ForEach

183

Em um aplicativo de metrô, preciso executar várias chamadas WCF. Há um número significativo de chamadas a serem feitas, então eu preciso fazer isso em um loop paralelo. O problema é que o loop paralelo é encerrado antes que as chamadas do WCF sejam concluídas.

Como você refatoraria isso para funcionar como esperado?

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new  System.Collections.Concurrent.BlockingCollection<Customer>();

Parallel.ForEach(ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

foreach ( var customer in customers )
{
    Console.WriteLine(customer.ID);
}

Console.ReadKey();
Darthg8r
fonte

Respostas:

172

A idéia por trás Parallel.ForEach()disso é que você tem um conjunto de threads e cada thread processa parte da coleção. Como você notou, isso não funciona com async- await, no qual você deseja liberar o encadeamento pela duração da chamada assíncrona.

Você pode "consertar" isso bloqueando os ForEach()threads, mas isso derrota todo o ponto de async- await.

O que você pode fazer é usar o TPL Dataflow em vez de Parallel.ForEach(), que suporta Taskbem os assíncronos .

Especificamente, seu código pode ser escrito usando um TransformBlockque transforma cada ID em um Customerusando o asynclambda. Este bloco pode ser configurado para ser executado em paralelo. Você vincularia esse bloco a um ActionBlockque grave cada um Customerno console. Depois de configurar a rede de bloqueio, Post()cada um pode identificar o TransformBlock.

Em código:

var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var getCustomerBlock = new TransformBlock<string, Customer>(
    async i =>
    {
        ICustomerRepo repo = new CustomerRepo();
        return await repo.GetCustomer(i);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
    writeCustomerBlock, new DataflowLinkOptions
    {
        PropagateCompletion = true
    });

foreach (var id in ids)
    getCustomerBlock.Post(id);

getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();

Embora você provavelmente queira limitar o paralelismo da a TransformBlockuma pequena constante. Além disso, você pode limitar a capacidade do TransformBlocke adicionar itens a ele de forma assíncrona SendAsync(), por exemplo, se a coleção for muito grande.

Como um benefício adicional quando comparado ao seu código (se funcionou) é que a gravação começará assim que um único item for concluído e não espere até que todo o processamento esteja concluído.

svick
fonte
2
Uma breve visão geral do assíncrono, extensões reativas, TPL e TPL DataFlow - vantsuyoshi.wordpress.com/2012/01/05/… para pessoas como eu, que podem precisar de alguma clareza.
131313 Norman H
1
Tenho certeza de que essa resposta NÃO paralela o processamento. Eu acredito que você precisa fazer um Parallel.ForEach sobre os IDs e publicá-los no getCustomerBlock. Pelo menos foi o que encontrei quando testei essa sugestão.
JasonLind
4
@JasonLind Realmente faz. Usando Parallel.ForEach()a Post()itens em paralelo não deve ter qualquer efeito real.
svick
1
Ok, eu o encontrei, o ActionBlock também precisa estar em paralelo. Eu estava fazendo isso de maneira um pouco diferente, não precisava de uma transformação, então usei um bufferblock e fiz meu trabalho no ActionBlock. Fiquei confuso com outra resposta nas interwebs.
JasonLind
2
Com o que quero dizer, especificar MaxDegreeOfParallelism no ActionBlock como você faz no TransformBlock no seu exemplo
JasonLind
125

A resposta de svick é (como sempre) excelente.

No entanto, considero o Dataflow mais útil quando você realmente possui grandes quantidades de dados para transferir. Ou quando você precisar de uma asyncfila compatível.

No seu caso, uma solução mais simples é usar apenas o asyncparalelismo do estilo:

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var customerTasks = ids.Select(i =>
  {
    ICustomerRepo repo = new CustomerRepo();
    return repo.GetCustomer(i);
  });
var customers = await Task.WhenAll(customerTasks);

foreach (var customer in customers)
{
  Console.WriteLine(customer.ID);
}

Console.ReadKey();
Stephen Cleary
fonte
13
Se você quisesse limitar manualmente o paralelismo (o que você provavelmente faz neste caso), fazê-lo dessa maneira seria mais complicado.
svick
1
Mas você está certo de que o Dataflow pode ser bastante complicado (por exemplo, quando comparado com Parallel.ForEach()). Mas acho que atualmente é a melhor opção para fazer praticamente qualquer asynctrabalho com coleções.
svick
1
@JamesManning como ParallelOptionsvai ajudar? É aplicável apenas a Parallel.For/ForEach/Invoke, que, como o OP estabelecido, não tem utilidade aqui.
Oade Schneider
1
@StephenCleary Se o GetCustomermétodo está retornando a Task<T>, Deveria estar usando Select(async i => { await repo.GetCustomer(i);});?
Shyju 04/04
5
@batmaci: Parallel.ForEachnão suporta async.
Stephen Cleary
81

Usar o DataFlow como svick sugeriu pode ser um exagero, e a resposta de Stephen não fornece os meios para controlar a simultaneidade da operação. No entanto, isso pode ser alcançado de maneira simples:

public static async Task RunWithMaxDegreeOfConcurrency<T>(
     int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory)
{
    var activeTasks = new List<Task>(maxDegreeOfConcurrency);
    foreach (var task in collection.Select(taskFactory))
    {
        activeTasks.Add(task);
        if (activeTasks.Count == maxDegreeOfConcurrency)
        {
            await Task.WhenAny(activeTasks.ToArray());
            //observe exceptions here
            activeTasks.RemoveAll(t => t.IsCompleted); 
        }
    }
    await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t => 
    {
        //observe exceptions in a manner consistent with the above   
    });
}

As ToArray()chamadas podem ser otimizadas usando uma matriz em vez de uma lista e substituindo tarefas concluídas, mas duvido que isso faria muita diferença na maioria dos cenários. Exemplo de uso de acordo com a pergunta do OP:

RunWithMaxDegreeOfConcurrency(10, ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

O usuário EDIT Fellow SO e o especialista em TPL Eli Arbel me indicaram um artigo relacionado de Stephen Toub . Como sempre, sua implementação é elegante e eficiente:

public static Task ForEachAsync<T>(
      this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate { 
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current).ContinueWith(t => 
                          {
                              //observe exceptions
                          });

        })); 
}
Ohad Schneider
fonte
1
@RichardPierre, na verdade, essa sobrecarga de Partitioner.Createparticionamento usa pedaços , que fornece elementos dinamicamente para as diferentes tarefas, para que o cenário que você descreveu não ocorra. Observe também que o particionamento estático (predeterminado) pode ser mais rápido em alguns casos devido a menos sobrecarga (especificamente sincronização). Para obter mais informações, consulte: msdn.microsoft.com/en-us/library/dd997411(v=vs.110).aspx .
Ohad Schneider
1
@OhadSchneider No // observe as exceções, se isso der uma exceção, isso ocorrerá até o chamador? Por exemplo, se eu quisesse que todo o enumerável parasse o processamento / falha, se alguma parte dele falhar?
Terry
3
@Terry fará uma borbulha para o chamador no sentido de que a tarefa principal (criada por Task.WhenAll) conterá a exceção (dentro de um AggregateException) e, consequentemente, se o chamador for usado await, uma exceção será lançada no site da chamada. No entanto, Task.WhenAllainda aguardará a conclusão de todas as tarefas e GetPartitionsalocará elementos dinamicamente quando partition.MoveNextfor chamado até que não haja mais elementos para processar. Isso significa que, a menos que você adicione seu próprio mecanismo para interromper o processamento (por exemplo CancellationToken), isso não acontecerá por si próprio.
Ohad Schneider
1
@gibbocool Ainda não tenho certeza se sigo. Suponha que você tenha um total de 7 tarefas, com os parâmetros especificados em seu comentário. Suponha ainda que o primeiro lote execute tarefas ocasionais de 5 segundos e três tarefas de 1 segundo. Após cerca de um segundo, a tarefa de 5 segundos ainda estará em execução, enquanto as três tarefas de 1 segundo serão concluídas. Nesse ponto, as três tarefas de 1 segundo restantes começarão a ser executadas (elas seriam fornecidas pelo particionador aos três threads "livres").
Ohad Schneider 28/08
2
@MichaelFreidgeim, você pode fazer algo como var current = partition.Currentantes await bodye depois usar currentna continuação ( ContinueWith(t => { ... }).
Ohad Schneider
43

Você pode economizar esforço com o novo pacote NuGet do AsyncEnumerator , que não existia há 4 anos quando a pergunta foi originalmente publicada. Permite controlar o grau de paralelismo:

using System.Collections.Async;
...

await ids.ParallelForEachAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
},
maxDegreeOfParallelism: 10);

Isenção de responsabilidade: sou o autor da biblioteca AsyncEnumerator, de código aberto e licenciada pelo MIT, e estou postando esta mensagem apenas para ajudar a comunidade.

Serge Semenov
fonte
11
Sergey, você deve divulgar que é um autor da biblioteca
Michael Freidgeim
5
ok, adicionou o aviso. Não estou buscando nenhum benefício em publicá-lo, só quero ajudar as pessoas;)
Serge Semenov
Sua biblioteca não é compatível com o .NET Core.
Corniel Nobel
2
@CornielNobel, é compatível com o .NET Core - o código-fonte no GitHub tem uma cobertura de teste para o .NET Framework e o .NET Core.
Serge Semenov
1
@SergeSemenov Eu usei bastante sua biblioteca AsyncStreamse tenho que dizer que é excelente. Não é possível recomendar esta biblioteca o suficiente.
WBuck 9/10/19
16

Embrulhe a Parallel.Foreachem Task.Run()e, em vez da awaitpalavra - chave use[yourasyncmethod].Result

(você precisa executar a tarefa Task.Run para não bloquear o thread da interface do usuário)

Algo assim:

var yourForeachTask = Task.Run(() =>
        {
            Parallel.ForEach(ids, i =>
            {
                ICustomerRepo repo = new CustomerRepo();
                var cust = repo.GetCustomer(i).Result;
                customers.Add(cust);
            });
        });
await yourForeachTask;
ofcoursedude
fonte
3
Qual é o problema com isso? Eu teria feito exatamente assim. Vamos Parallel.ForEachfazer o trabalho paralelo, que bloqueia até que tudo esteja pronto, e depois enviar a coisa toda para um thread em segundo plano para ter uma interface de usuário responsiva. Algum problema com isso? Talvez esse seja um segmento adormecido demais, mas é um código curto e legível.
ygoe 17/06/2015
@LonelyPixel Meu único problema é que ele liga Task.Runquando TaskCompletionSourceé preferível.
Gusdor 30/03/16
1
@Gusdor Curious - por que é TaskCompletionSourcepreferível?
Seafish
@ Seafish Uma boa pergunta que eu gostaria de poder responder. Deve ter sido um dia difícil: D
Gusdor 13/07/16
Apenas uma pequena atualização. Eu estava procurando exatamente isso agora, rolei para baixo para encontrar a solução mais simples e encontrei meu próprio comentário novamente. Eu usei exatamente esse código e funciona como esperado. Ele assume apenas que há uma versão Sync das chamadas Async originais dentro do loop. awaitpode ser movido na frente para salvar o nome extra da variável.
ygoe
7

Isso deve ser bastante eficiente e mais fácil do que fazer com que todo o fluxo de dados TPL funcione:

var customers = await ids.SelectAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    return await repo.GetCustomer(i);
});

...

public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4)
{
    var results = new List<TResult>();

    var activeTasks = new HashSet<Task<TResult>>();
    foreach (var item in source)
    {
        activeTasks.Add(selector(item));
        if (activeTasks.Count >= maxDegreesOfParallelism)
        {
            var completed = await Task.WhenAny(activeTasks);
            activeTasks.Remove(completed);
            results.Add(completed.Result);
        }
    }

    results.AddRange(await Task.WhenAll(activeTasks));
    return results;
}
John Gietzen
fonte
O exemplo de uso não deve usar awaitcomo var customers = await ids.SelectAsync(async i => { ... });:?
Paccc
5

Estou um pouco atrasado para a festa, mas você pode considerar usar GetAwaiter.GetResult () para executar seu código assíncrono no contexto de sincronização, mas tão paralelo quanto abaixo;

 Parallel.ForEach(ids, i =>
{
    ICustomerRepo repo = new CustomerRepo();
    // Run this in thread which Parallel library occupied.
    var cust = repo.GetCustomer(i).GetAwaiter().GetResult();
    customers.Add(cust);
});
Teoman shipahi
fonte
5

Um método de extensão para isso que utiliza o SemaphoreSlim e também permite definir o grau máximo de paralelismo

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxDegreeOfParallelism = null)
    {
        if (maxDegreeOfParallelism.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

Uso da amostra:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);
Jay Shah
fonte
5

Depois de introduzir vários métodos auxiliares, você poderá executar consultas paralelas com esta sintaxe simples:

const int DegreeOfParallelism = 10;
IEnumerable<double> result = await Enumerable.Range(0, 1000000)
    .Split(DegreeOfParallelism)
    .SelectManyAsync(async i => await CalculateAsync(i).ConfigureAwait(false))
    .ConfigureAwait(false);

O que acontece aqui é: dividimos a coleção de fontes em 10 partes ( .Split(DegreeOfParallelism)), depois executamos 10 tarefas, cada uma processando seus itens um por um ( .SelectManyAsync(...)) e mesclando-os novamente em uma única lista.

Vale ressaltar que existe uma abordagem mais simples:

double[] result2 = await Enumerable.Range(0, 1000000)
    .Select(async i => await CalculateAsync(i).ConfigureAwait(false))
    .WhenAll()
    .ConfigureAwait(false);

Mas ele precisa de uma precaução : se você tiver uma coleção de fontes muito grande, ela agendará um Taskpara cada item imediatamente, o que pode causar impactos significativos no desempenho.

Os métodos de extensão usados ​​nos exemplos acima são os seguintes:

public static class CollectionExtensions
{
    /// <summary>
    /// Splits collection into number of collections of nearly equal size.
    /// </summary>
    public static IEnumerable<List<T>> Split<T>(this IEnumerable<T> src, int slicesCount)
    {
        if (slicesCount <= 0) throw new ArgumentOutOfRangeException(nameof(slicesCount));

        List<T> source = src.ToList();
        var sourceIndex = 0;
        for (var targetIndex = 0; targetIndex < slicesCount; targetIndex++)
        {
            var list = new List<T>();
            int itemsLeft = source.Count - targetIndex;
            while (slicesCount * list.Count < itemsLeft)
            {
                list.Add(source[sourceIndex++]);
            }

            yield return list;
        }
    }

    /// <summary>
    /// Takes collection of collections, projects those in parallel and merges results.
    /// </summary>
    public static async Task<IEnumerable<TResult>> SelectManyAsync<T, TResult>(
        this IEnumerable<IEnumerable<T>> source,
        Func<T, Task<TResult>> func)
    {
        List<TResult>[] slices = await source
            .Select(async slice => await slice.SelectListAsync(func).ConfigureAwait(false))
            .WhenAll()
            .ConfigureAwait(false);
        return slices.SelectMany(s => s);
    }

    /// <summary>Runs selector and awaits results.</summary>
    public static async Task<List<TResult>> SelectListAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector)
    {
        List<TResult> result = new List<TResult>();
        foreach (TSource source1 in source)
        {
            TResult result1 = await selector(source1).ConfigureAwait(false);
            result.Add(result1);
        }
        return result;
    }

    /// <summary>Wraps tasks with Task.WhenAll.</summary>
    public static Task<TResult[]> WhenAll<TResult>(this IEnumerable<Task<TResult>> source)
    {
        return Task.WhenAll<TResult>(source);
    }
}
Vitaliy Ulantikov
fonte