Eu gostaria de await
no resultado de BlockingCollection<T>.Take()
forma assíncrona, então eu não bloqueio o segmento. Procurando por algo assim:
var item = await blockingCollection.TakeAsync();
Eu sei que poderia fazer isso:
var item = await Task.Run(() => blockingCollection.Take());
mas isso meio que acaba com a ideia toda, porque outro thread (de ThreadPool
) é bloqueado.
Existe alguma alternativa?
await Task.Run(() => blockingCollection.Take())
a tarefa será executada em outro thread e seu thread de IU não será bloqueado. Não é esse o ponto?Task
API baseada em exportação de biblioteca . Ele pode ser usado a partir do ASP.NET, por exemplo. O código em questão não escalaria bem ali.ConfigureAwait
fosse usado após oRun()
? [ed. deixa pra lá, entendo o que você está dizendo agora]Respostas:
Existem quatro alternativas que eu conheço.
O primeiro é Canais , que fornece uma fila multitarefa que suportes assíncrona
Read
eWrite
operações. Os canais são altamente otimizados e, opcionalmente, suportam a eliminação de alguns itens se um limite for atingido.O próximo é
BufferBlock<T>
do TPL Dataflow . Se você tiver apenas um consumidor, poderá usarOutputAvailableAsync
ouReceiveAsync
, ou apenas vinculá-lo a umActionBlock<T>
. Para mais informações, consulte meu blog .Os dois últimos são tipos que criei, disponíveis em minha biblioteca AsyncEx .
AsyncCollection<T>
éasync
quase equivalente aBlockingCollection<T>
, capaz de envolver uma coleção de produtor / consumidor simultânea, comoConcurrentQueue<T>
ouConcurrentBag<T>
. Você pode usarTakeAsync
para consumir itens da coleção de maneira assíncrona. Para mais informações, consulte meu blog .AsyncProducerConsumerQueue<T>
é umaasync
fila de produtor / consumidor compatível com mais portabilidade . Você pode usarDequeueAsync
para consumir itens da fila de maneira assíncrona. Para mais informações, consulte meu blog .As três últimas alternativas permitem puts e take síncronos e assíncronos.
fonte
AsyncCollection.TryTakeAsync
, mas não consigo encontrar no downloadNito.AsyncEx.Coordination.dll 5.0.0.0
(versão mais recente). O Nito.AsyncEx.Concurrent.dll referenciado não existe no pacote . o que estou perdendo?while ((result = await collection.TryTakeAsync()).Success) { }
. Por que foi removido?... ou você pode fazer isso:
using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; public class AsyncQueue<T> { private readonly SemaphoreSlim _sem; private readonly ConcurrentQueue<T> _que; public AsyncQueue() { _sem = new SemaphoreSlim(0); _que = new ConcurrentQueue<T>(); } public void Enqueue(T item) { _que.Enqueue(item); _sem.Release(); } public void EnqueueRange(IEnumerable<T> source) { var n = 0; foreach (var item in source) { _que.Enqueue(item); n++; } _sem.Release(n); } public async Task<T> DequeueAsync(CancellationToken cancellationToken = default(CancellationToken)) { for (; ; ) { await _sem.WaitAsync(cancellationToken); T item; if (_que.TryDequeue(out item)) { return item; } } } }
Fila FIFO assíncrona simples e totalmente funcional.
fonte
for
? se o semáforo for liberado, a fila tem pelo menos um item para retirar da fila, não?TryDequeue
são, retorna com um valor ou não retorna de todo. Tecnicamente, se você tiver mais de 1 leitor, o mesmo leitor pode consumir dois (ou mais) itens antes que qualquer outro leitor esteja totalmente desperto. Um sucessoWaitAsync
é apenas um sinal de que pode haver itens na fila para consumir, não é uma garantia.If the value of the CurrentCount property is zero before this method is called, the method also allows releaseCount threads or tasks blocked by a call to the Wait or WaitAsync method to enter the semaphore.
de docs.microsoft.com/en-us/dotnet/api/… Como é bem-sucedidoWaitAsync
não ter itens na fila? Se a liberação de N despertar mais do que N consumidores do quesemaphore
está quebrado. Não é?Aqui está uma implementação muito básica de um
BlockingCollection
que oferece suporte à espera, com muitos recursos ausentes. Ele usa aAsyncEnumerable
biblioteca, que torna a enumeração assíncrona possível para versões C # anteriores a 8.0.public class AsyncBlockingCollection<T> { // Missing features: cancellation, boundedCapacity, TakeAsync private Queue<T> _queue = new Queue<T>(); private SemaphoreSlim _semaphore = new SemaphoreSlim(0); private int _consumersCount = 0; private bool _isAddingCompleted; public void Add(T item) { lock (_queue) { if (_isAddingCompleted) throw new InvalidOperationException(); _queue.Enqueue(item); } _semaphore.Release(); } public void CompleteAdding() { lock (_queue) { if (_isAddingCompleted) return; _isAddingCompleted = true; if (_consumersCount > 0) _semaphore.Release(_consumersCount); } } public IAsyncEnumerable<T> GetConsumingEnumerable() { lock (_queue) _consumersCount++; return new AsyncEnumerable<T>(async yield => { while (true) { lock (_queue) { if (_queue.Count == 0 && _isAddingCompleted) break; } await _semaphore.WaitAsync(); bool hasItem; T item = default; lock (_queue) { hasItem = _queue.Count > 0; if (hasItem) item = _queue.Dequeue(); } if (hasItem) await yield.ReturnAsync(item); } }); } }
Exemplo de uso:
var abc = new AsyncBlockingCollection<int>(); var producer = Task.Run(async () => { for (int i = 1; i <= 10; i++) { await Task.Delay(100); abc.Add(i); } abc.CompleteAdding(); }); var consumer = Task.Run(async () => { await abc.GetConsumingEnumerable().ForEachAsync(async item => { await Task.Delay(200); await Console.Out.WriteAsync(item + " "); }); }); await Task.WhenAll(producer, consumer);
Resultado:
Atualização: com o lançamento do C # 8, a enumeração assíncrona se tornou um recurso de linguagem integrado. As classes necessárias (
IAsyncEnumerable
,IAsyncEnumerator
) são incorporadas ao .NET Core 3.0 e são oferecidas como um pacote para .NET Framework 4.6.1+ ( Microsoft.Bcl.AsyncInterfaces ).Aqui está uma
GetConsumingEnumerable
implementação alternativa , apresentando a nova sintaxe C # 8:public async IAsyncEnumerable<T> GetConsumingEnumerable() { lock (_queue) _consumersCount++; while (true) { lock (_queue) { if (_queue.Count == 0 && _isAddingCompleted) break; } await _semaphore.WaitAsync(); bool hasItem; T item = default; lock (_queue) { hasItem = _queue.Count > 0; if (hasItem) item = _queue.Dequeue(); } if (hasItem) yield return item; } }
Observe a coexistência de
await
eyield
no mesmo método.Exemplo de uso (C # 8):
var consumer = Task.Run(async () => { await foreach (var item in abc.GetConsumingEnumerable()) { await Task.Delay(200); await Console.Out.WriteAsync(item + " "); } });
Observe o
await
antes deforeach
.fonte
AsyncBlockingCollection
faz sentido. Algo não pode ser assíncrono e bloqueador ao mesmo tempo, pois esses dois conceitos são exatamente opostos!Se você não se importa em um pequeno hack, pode tentar essas extensões.
public static async Task AddAsync<TEntity>( this BlockingCollection<TEntity> Bc, TEntity item, CancellationToken abortCt) { while (true) { try { if (Bc.TryAdd(item, 0, abortCt)) return; else await Task.Delay(100, abortCt); } catch (Exception) { throw; } } } public static async Task<TEntity> TakeAsync<TEntity>( this BlockingCollection<TEntity> Bc, CancellationToken abortCt) { while (true) { try { TEntity item; if (Bc.TryTake(out item, 0, abortCt)) return item; else await Task.Delay(100, abortCt); } catch (Exception) { throw; } } }
fonte