A idéia por trás Parallel.ForEach()
disso é que você tem um conjunto de threads e cada thread processa parte da coleção. Como você notou, isso não funciona com async
- await
, no qual você deseja liberar o encadeamento pela duração da chamada assíncrona.
Você pode "consertar" isso bloqueando os ForEach()
threads, mas isso derrota todo o ponto de async
- await
.
O que você pode fazer é usar o TPL Dataflow em vez de Parallel.ForEach()
, que suporta Task
bem os assíncronos .
Especificamente, seu código pode ser escrito usando um TransformBlock
que transforma cada ID em um Customer
usando o async
lambda. Este bloco pode ser configurado para ser executado em paralelo. Você vincularia esse bloco a um ActionBlock
que grave cada um Customer
no console. Depois de configurar a rede de bloqueio, Post()
cada um pode identificar o TransformBlock
.
Em código:
var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var getCustomerBlock = new TransformBlock<string, Customer>(
async i =>
{
ICustomerRepo repo = new CustomerRepo();
return await repo.GetCustomer(i);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
writeCustomerBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
foreach (var id in ids)
getCustomerBlock.Post(id);
getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();
Embora você provavelmente queira limitar o paralelismo da a TransformBlock
uma pequena constante. Além disso, você pode limitar a capacidade do TransformBlock
e adicionar itens a ele de forma assíncrona SendAsync()
, por exemplo, se a coleção for muito grande.
Como um benefício adicional quando comparado ao seu código (se funcionou) é que a gravação começará assim que um único item for concluído e não espere até que todo o processamento esteja concluído.
Parallel.ForEach()
aPost()
itens em paralelo não deve ter qualquer efeito real.A resposta de svick é (como sempre) excelente.
No entanto, considero o Dataflow mais útil quando você realmente possui grandes quantidades de dados para transferir. Ou quando você precisar de uma
async
fila compatível.No seu caso, uma solução mais simples é usar apenas o
async
paralelismo do estilo:fonte
Parallel.ForEach()
). Mas acho que atualmente é a melhor opção para fazer praticamente qualquerasync
trabalho com coleções.ParallelOptions
vai ajudar? É aplicável apenas aParallel.For/ForEach/Invoke
, que, como o OP estabelecido, não tem utilidade aqui.GetCustomer
método está retornando aTask<T>
, Deveria estar usandoSelect(async i => { await repo.GetCustomer(i);});
?Parallel.ForEach
não suportaasync
.Usar o DataFlow como svick sugeriu pode ser um exagero, e a resposta de Stephen não fornece os meios para controlar a simultaneidade da operação. No entanto, isso pode ser alcançado de maneira simples:
As
ToArray()
chamadas podem ser otimizadas usando uma matriz em vez de uma lista e substituindo tarefas concluídas, mas duvido que isso faria muita diferença na maioria dos cenários. Exemplo de uso de acordo com a pergunta do OP:O usuário EDIT Fellow SO e o especialista em TPL Eli Arbel me indicaram um artigo relacionado de Stephen Toub . Como sempre, sua implementação é elegante e eficiente:
fonte
Partitioner.Create
particionamento usa pedaços , que fornece elementos dinamicamente para as diferentes tarefas, para que o cenário que você descreveu não ocorra. Observe também que o particionamento estático (predeterminado) pode ser mais rápido em alguns casos devido a menos sobrecarga (especificamente sincronização). Para obter mais informações, consulte: msdn.microsoft.com/en-us/library/dd997411(v=vs.110).aspx .Task.WhenAll
) conterá a exceção (dentro de umAggregateException
) e, consequentemente, se o chamador for usadoawait
, uma exceção será lançada no site da chamada. No entanto,Task.WhenAll
ainda aguardará a conclusão de todas as tarefas eGetPartitions
alocará elementos dinamicamente quandopartition.MoveNext
for chamado até que não haja mais elementos para processar. Isso significa que, a menos que você adicione seu próprio mecanismo para interromper o processamento (por exemploCancellationToken
), isso não acontecerá por si próprio.var current = partition.Current
antesawait body
e depois usarcurrent
na continuação (ContinueWith(t => { ... }
).Você pode economizar esforço com o novo pacote NuGet do AsyncEnumerator , que não existia há 4 anos quando a pergunta foi originalmente publicada. Permite controlar o grau de paralelismo:
Isenção de responsabilidade: sou o autor da biblioteca AsyncEnumerator, de código aberto e licenciada pelo MIT, e estou postando esta mensagem apenas para ajudar a comunidade.
fonte
AsyncStreams
e tenho que dizer que é excelente. Não é possível recomendar esta biblioteca o suficiente.Embrulhe a
Parallel.Foreach
emTask.Run()
e, em vez daawait
palavra - chave use[yourasyncmethod].Result
(você precisa executar a tarefa Task.Run para não bloquear o thread da interface do usuário)
Algo assim:
fonte
Parallel.ForEach
fazer o trabalho paralelo, que bloqueia até que tudo esteja pronto, e depois enviar a coisa toda para um thread em segundo plano para ter uma interface de usuário responsiva. Algum problema com isso? Talvez esse seja um segmento adormecido demais, mas é um código curto e legível.Task.Run
quandoTaskCompletionSource
é preferível.TaskCompletionSource
preferível?await
pode ser movido na frente para salvar o nome extra da variável.Isso deve ser bastante eficiente e mais fácil do que fazer com que todo o fluxo de dados TPL funcione:
fonte
await
comovar customers = await ids.SelectAsync(async i => { ... });
:?Estou um pouco atrasado para a festa, mas você pode considerar usar GetAwaiter.GetResult () para executar seu código assíncrono no contexto de sincronização, mas tão paralelo quanto abaixo;
fonte
Um método de extensão para isso que utiliza o SemaphoreSlim e também permite definir o grau máximo de paralelismo
Uso da amostra:
fonte
Depois de introduzir vários métodos auxiliares, você poderá executar consultas paralelas com esta sintaxe simples:
O que acontece aqui é: dividimos a coleção de fontes em 10 partes (
.Split(DegreeOfParallelism)
), depois executamos 10 tarefas, cada uma processando seus itens um por um (.SelectManyAsync(...)
) e mesclando-os novamente em uma única lista.Vale ressaltar que existe uma abordagem mais simples:
Mas ele precisa de uma precaução : se você tiver uma coleção de fontes muito grande, ela agendará um
Task
para cada item imediatamente, o que pode causar impactos significativos no desempenho.Os métodos de extensão usados nos exemplos acima são os seguintes:
fonte