Eu estou tentando enumerar uma grande IEnumerable
uma vez, e observar a enumeração com vários operadores ligados ( Count
, Sum
, Average
etc). A maneira óbvia é transformá-lo em um IObservable
com o método ToObservable
e, em seguida, inscrever um observador nele. Notei que isso é muito mais lento que outros métodos, como fazer um loop simples e notificar o observador em cada iteração ou usar o Observable.Create
método em vez de ToObservable
. A diferença é substancial: é 20 a 30 vezes mais lenta. É o que é, ou estou fazendo algo errado?
using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
public static class Program
{
static void Main(string[] args)
{
const int COUNT = 10_000_000;
Method1(COUNT);
Method2(COUNT);
Method3(COUNT);
}
static void Method1(int count)
{
var source = Enumerable.Range(0, count);
var subject = new Subject<int>();
var stopwatch = Stopwatch.StartNew();
source.ToObservable().Subscribe(subject);
Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec");
}
static void Method2(int count)
{
var source = Enumerable.Range(0, count);
var subject = new Subject<int>();
var stopwatch = Stopwatch.StartNew();
foreach (var item in source) subject.OnNext(item);
subject.OnCompleted();
Console.WriteLine($"Loop & Notify: {stopwatch.ElapsedMilliseconds:#,0} msec");
}
static void Method3(int count)
{
var source = Enumerable.Range(0, count);
var subject = new Subject<int>();
var stopwatch = Stopwatch.StartNew();
Observable.Create<int>(o =>
{
foreach (var item in source) o.OnNext(item);
o.OnCompleted();
return Disposable.Empty;
}).Subscribe(subject);
Console.WriteLine($"Observable.Create: {stopwatch.ElapsedMilliseconds:#,0} msec");
}
}
Resultado:
ToObservable: 7,576 msec
Loop & Notify: 273 msec
Observable.Create: 511 msec
.NET Core 3.0, C # 8, System.Reactive 4.3.2, Windows 10, Aplicativo de console, Versão compilada
Atualização: Aqui está um exemplo da funcionalidade real que desejo obter:
var source = Enumerable.Range(0, 10_000_000).Select(i => (long)i);
var subject = new Subject<long>();
var cntTask = subject.Count().ToTask();
var sumTask = subject.Sum().ToTask();
var avgTask = subject.Average().ToTask();
source.ToObservable().Subscribe(subject);
Console.WriteLine($"Count: {cntTask.Result:#,0}, Sum: {sumTask.Result:#,0}, Average: {avgTask.Result:#,0.0}");
Resultado:
Contagem: 10.000.000, Soma: 49.999.995.000.000, Média: 4.999.999,5
A diferença importante dessa abordagem em comparação ao uso de operadores LINQ padrão é que o enumerável de origem é enumerado apenas uma vez.
Mais uma observação: o uso ToObservable(Scheduler.Immediate)
é um pouco mais rápido (cerca de 20%) do que ToObservable()
.
fonte
Respostas:
Essa é a diferença entre um observável bem comportado e um observador do tipo "faça você mesmo porque você pensa que mais rápido é melhor, mas não é".
Quando você mergulha o suficiente na fonte, descobre esta pequena e adorável linha:
Ele está efetivamente chamando
hasNext = enumerator.MoveNext();
uma vez por iteração recursiva agendada.Isso permite que você escolha o agendador para sua
.ToObservable(schedulerOfYourChoice)
chamada.Com as outras opções que você escolheu, você criou uma série de chamadas simples para
.OnNext
praticamente não fazer nada.Method2
nem sequer tem uma.Subscribe
ligação.Ambos
Method2
eMethod1
executado usando o segmento atual e ambos executado até sua conclusão antes da subscrição está terminado. Eles estão bloqueando chamadas. Eles podem causar condições de corrida.Method1
é o único que se comporta bem como observável. É assíncrono e pode ser executado independentemente do assinante.Lembre-se de que observáveis são coleções executadas ao longo do tempo. Eles normalmente têm uma fonte assíncrona ou um timer ou respondem a estímulos externos. Eles não costumam ficar sem enumeráveis. Se você estiver trabalhando com um enumerável, espera-se que o trabalho síncrono seja executado mais rapidamente.
Velocidade não é o objetivo de Rx. Realizar consultas complexas em valores enviados com base no tempo é o objetivo.
fonte
source.Aggregate(new { count = 0, sum = 0L }, (a, x) => new { count = a.count + 1, sum = a.sum + x }, a => new { a.count, a.sum, average = (double)a.sum / a.count })
. Apenas uma iteração e mais de 10x mais rápido que o Rx.ToObservable
). Este é o outro extremo, onde tenho o melhor desempenho, mas sou forçado a reimplementar todos os operadores LINQ dentro de uma expressão lambda complexa. É propenso a erros e menos sustentável, considerando que meus cálculos reais envolvem ainda mais operadores e combinações deles. Acho muito tentador pagar um preço de desempenho x2 por ter uma solução clara e legível. Por outro lado, pagando x10 ou x20, nem tanto!Porque o Sujeito não faz nada.
Parece que o desempenho da instrução loop é diferente em 2 casos:
ou
Se usar outro Assunto, com uma implementação OnNext lenta, o resultado será mais aceitável
Resultado
O suporte ToObservable System.Reactive.Concurrency.IScheduler
Isso significa que você pode implementar seu próprio IScheduler e decidir quando executar cada tarefa
Espero que isto ajude
Saudações
fonte
subject
é observado por outros operadores que executam cálculos, por isso não está fazendo nada. A penalidade de desempenho do usoToObservable
ainda é substancial, porque os cálculos são muito leves.