Por que os assuntos não são recomendados nas extensões reativas do .NET?

111

No momento, estou começando a me familiarizar com a estrutura Reactive Extensions para .NET e estou trabalhando nos vários recursos de introdução que encontrei (principalmente http://www.introtorx.com )

Nosso aplicativo envolve uma série de interfaces de hardware que detectam quadros de rede, esses serão meus IObservables, então tenho uma variedade de componentes que consumirão esses quadros ou realizarão alguma forma de transformação nos dados e produzirão um novo tipo de quadro. Haverá também outros componentes que precisam ser exibidos a cada enésima moldura, por exemplo. Estou convencido de que Rx será útil para nossa aplicação, mas estou lutando com os detalhes de implementação para a interface IObserver.

A maioria (senão todos) dos recursos que tenho lido disseram que não devo implementar a interface IObservable sozinho, mas usar uma das funções ou classes fornecidas. De minha pesquisa, parece que a criação de um Subject<IBaseFrame>forneceria o que eu preciso, eu teria meu único thread que lê dados da interface de hardware e, em seguida, chama a função OnNext da minha Subject<IBaseFrame>instância. Os diferentes componentes IObserver, então, receberiam suas notificações desse Assunto.

Minha confusão vem do conselho dado no apêndice deste tutorial, onde diz:

Evite o uso dos tipos de assunto. Rx é efetivamente um paradigma de programação funcional. Usar assuntos significa que agora estamos gerenciando o estado, que é potencialmente mutante. Lidar com o estado mutante e a programação assíncrona ao mesmo tempo é muito difícil de acertar. Além disso, muitos dos operadores (métodos de extensão) foram cuidadosamente escritos para garantir que a vida útil correta e consistente das assinaturas e sequências seja mantida; quando você apresenta assuntos, você pode quebrar isso. Versões futuras também podem sofrer degradação significativa de desempenho se você usar assuntos explicitamente.

Meu aplicativo é bastante crítico para o desempenho, obviamente vou testar o desempenho de usar os padrões Rx antes de entrar no código de produção; no entanto, estou preocupado por estar fazendo algo que vai contra o espírito do framework Rx usando a classe Subject e que uma versão futura do framework vá prejudicar o desempenho.

Existe uma maneira melhor de fazer o que eu quero? O encadeamento de sondagem de hardware será executado continuamente, haja observadores ou não (o buffer de HW fará backup de outra forma), portanto, esta é uma sequência muito quente. Preciso então passar os quadros recebidos para vários observadores.

Qualquer conselho seria muito apreciado.

Anthony
fonte
1
Realmente ajudou na minha compreensão do assunto, estou apenas entendendo as coisas na minha cabeça sobre como usá-lo em meu aplicativo. Eu sei que eles são a coisa certa - eu tenho um pipeline de componentes que são muito orientados por push e preciso fazer todos os tipos de filtragem e invocação no thread de IU para exibir em uma GUI, bem como armazenar em buffer o último quadro recebido etc. etc - Eu só preciso ter certeza de que estou fazendo certo da primeira vez!
Anthony

Respostas:

70

Ok, se ignorarmos meus modos dogmáticos e ignorarmos "assuntos são bons / maus" todos juntos. Vejamos o espaço do problema.

Aposto que você tem 1 ou 2 estilos de sistema para os quais precisa se integrar.

  1. O sistema gera um evento ou um retorno de chamada quando uma mensagem chega
  2. Você precisa pesquisar o sistema para ver se há alguma mensagem para processar

Para a opção 1, fácil, apenas envolvemos com o método FromEvent apropriado e pronto. Para o pub!

Para a opção 2, agora precisamos considerar como fazemos a votação e como fazer isso com eficiência. Além disso, quando obtemos o valor, como o publicamos?

Eu imagino que você gostaria de um tópico dedicado para votação. Você não gostaria que outro programador martelasse o ThreadPool / TaskPool e o deixasse em uma situação de fome de ThreadPool. Alternativamente, você não quer o incômodo de troca de contexto (eu acho). Portanto, suponha que temos nosso próprio thread, provavelmente teremos algum tipo de loop While / Sleep que sentaremos para pesquisar. Quando a verificação encontra algumas mensagens, nós as publicamos. Bem, tudo isso parece perfeito para Observable.Create. Agora provavelmente não podemos usar um loop While, pois isso não nos permitirá retornar um Disposable para permitir o cancelamento. Felizmente, você leu o livro inteiro, por isso está familiarizado com a programação recursiva!

Eu imagino que algo assim poderia funcionar. #Não testado

public class MessageListener
{
    private readonly IObservable<IMessage> _messages;
    private readonly IScheduler _scheduler;

    public MessageListener()
    {
        _scheduler = new EventLoopScheduler();

        var messages = ListenToMessages()
                                    .SubscribeOn(_scheduler)
                                    .Publish();

        _messages = messages;
        messages.Connect();
    }

    public IObservable<IMessage> Messages
    {
        get {return _messages;}
    }

    private IObservable<IMessage> ListenToMessages()
    {
        return Observable.Create<IMessage>(o=>
        {
                return _scheduler.Schedule(recurse=>
                {
                    try
                    {           
                        var messages = GetMessages();
                        foreach (var msg in messages)
                        {
                            o.OnNext(msg);
                        }   
                        recurse();
                    }
                    catch (Exception ex)
                    {
                        o.OnError(ex);
                    }                   
                });
        });
    }

    private IEnumerable<IMessage> GetMessages()
    {
         //Do some work here that gets messages from a queue, 
         // file system, database or other system that cant push 
         // new data at us.
         // 
         //This may return an empty result when no new data is found.
    }
}

A razão pela qual eu realmente não gosto de Assuntos é que geralmente é o caso do desenvolvedor não ter um design claro sobre o problema. Invadir um assunto, cutucar aqui, ali e em todos os lugares, e então deixar o desenvolvedor de suporte pobre adivinhar que WTF estava acontecendo. Quando você usa os métodos Criar / Gerar etc., você localiza os efeitos na sequência. Você pode ver tudo em um método e saber que ninguém mais está causando um efeito colateral desagradável. Se eu vir um campo de assunto, agora tenho que procurar todos os lugares em uma classe em que ele está sendo usado. Se algum MFer expõe um publicamente, então todas as apostas estão canceladas, quem sabe como essa sequência está sendo usada! Assíncrono / Concorrência / Rx é difícil. Você não precisa tornar as coisas mais difíceis, permitindo que os efeitos colaterais e a programação de causalidade girem ainda mais sua cabeça.

Lee Campbell
fonte
10
Acabei de ler esta resposta agora, mas achei que deveria apontar que nunca consideraria expor a interface do Assunto! Estou usando-o para fornecer a implementação IObservable <> em uma classe lacrada (que expõe o IObservable <>). Eu posso definitivamente ver por que expor a interface do Assunto <> seria uma coisa ruim ™
Anthony
ei, desculpe ser grosso, mas eu realmente não entendo seu código. o que ListenToMessages () e GetMessages () estão fazendo e retornando?
user10479 01 de
1
Para o seu projeto pessoal @jeromerg, isso pode ser bom. No entanto, em minha experiência, os desenvolvedores lutam com WPF, MVVM, design de GUI de teste de unidade e, em seguida, lançar Rx pode tornar as coisas mais complicadas. Tentei o padrão BehaviourSubject-as-a-property. No entanto, descobri que seria muito mais adotável para outros se usássemos propriedades INPC padrão e, em seguida, usássemos um método de extensão simples para converter isso para IObservable. Além disso, você precisará de ligações WPF personalizadas para trabalhar com seus assuntos de comportamento. Agora sua pobre equipe tem que aprender WPF, MVVM, Rx e seu novo framework também.
Lee Campbell
2
@LeeCampbell, para colocá-lo em termos de seu exemplo de código, a maneira normal seria que MessageListener fosse construído pelo sistema (você provavelmente registra o nome da classe de alguma forma) e é informado de que o sistema irá então chamar OnCreate () e OnGoodbye () e chamará message1 (), message2 () e message3 () conforme as mensagens forem geradas. Parece que messageX [123] chamaria OnNext sobre um assunto, mas existe uma maneira melhor?
James Moore,
1
@JamesMoore, pois essas coisas são muito mais fáceis de explicar com exemplos concretos. Se você conhece um aplicativo de código aberto para Android que usa Rx e Assuntos, talvez eu consiga encontrar tempo para ver se posso fornecer uma maneira melhor. Eu entendo que não é muito útil ficar em um pedestal e dizer que os assuntos são ruins. Mas acho que coisas como IntroToRx, RxCookbook e ReactiveTrader dão vários níveis de exemplo de como usar Rx.
Lee Campbell
38

Em geral, você deve evitar o uso Subject, no entanto, para o que está fazendo aqui, acho que funcionam muito bem. Eu fiz uma pergunta semelhante quando me deparei com a mensagem "evitar assuntos" nos tutoriais Rx.

Para citar Dave Sexton (de Rxx)

"Assuntos são os componentes stateful do Rx. Eles são úteis quando você precisa criar um observável semelhante a um evento como um campo ou uma variável local."

Costumo usá-los como ponto de entrada no Rx. Portanto, se eu tiver algum código que precise dizer 'algo aconteceu' (como você), usaria um Subjecte chamaria OnNext. Em seguida, exponha isso como uma IObservableassinatura de outros (você pode usarAsObservable() assunto para que em seu assunto para garantir que ninguém possa transmitir para um assunto e bagunçar as coisas).

Você também pode conseguir isso com um evento .NET e usar FromEventPattern, mas se eu só vou transformar o evento em um IObservable, não vejo a vantagem de ter um evento em vez de umSubject (o que pode significar que estou perdendo algo aqui)

No entanto, o que você deve evitar fortemente é assinar um IObservablecom a Subject, ou seja, não passe um Subjectpara o IObservable.Subscribemétodo.

Wilka
fonte
Por que você precisa de estado afinal? Conforme mostrado pela minha resposta, se você dividir o problema em partes separadas, você realmente não precisa gerenciar o estado. Assuntos não devem ser usados ​​neste caso.
casperOne
8
@casperOne Você não precisa de um estado fora do Assunto <T> ou evento (ambos têm coleções de coisas para chamar, observadores ou manipuladores de eventos). Eu apenas prefiro usar um Assunto se a única razão para adicionar um evento for envolvê-lo com FromEventPattern. Além de uma mudança nos esquemas de exceções, que pode ser importante para você, não vejo nenhum benefício em evitar o Assunto dessa forma. Novamente, posso estar faltando algo mais aqui que o evento é preferível ao Assunto. A menção de estado era apenas parte da citação, e parecia melhor deixá-la. Talvez seja mais clara sem essa parte?
Wilka
@casperOne - mas você também não deve criar um evento apenas para envolvê-lo com FromEventPattern. Obviamente, é uma ideia terrível.
James Moore,
3
Expliquei minha citação com mais detalhes nesta postagem do blog .
Dave Sexton,
Costumo usá-los como ponto de entrada no Rx. Isso acertou em cheio para mim. Eu tenho uma situação em que há uma API que, quando invocada, gera eventos que gostaria de passar por um pipeline de processamento reativo. O Assunto foi a resposta para mim, já que o FromEventPattern não parece existir no RxJava AFAICT.
scorpiodawg
31

Freqüentemente, quando você está gerenciando um Assunto, você está na verdade apenas reimplementando recursos já no Rx, e provavelmente de uma forma não tão robusta, simples e extensível.

Quando você está tentando adaptar algum fluxo de dados assíncrono em Rx (ou criar um fluxo de dados assíncrono de um que não seja atualmente assíncrono), os casos mais comuns geralmente são:

  • A fonte dos dados é um evento : como diz Lee, este é o caso mais simples: use FromEvent e vá para o pub.

  • A fonte de dados é de uma operação síncrona e você deseja atualizações pesquisadas (por exemplo, um serviço da web ou chamada de banco de dados): Nesse caso, você pode usar a abordagem sugerida por Lee ou, para casos simples, pode usar algo como Observable.Interval.Select(_ => <db fetch>). Você pode usar DistinctUntilChanged () para evitar a publicação de atualizações quando nada foi alterado nos dados de origem.

  • A fonte de dados é algum tipo de api assíncrona que chama seu retorno de chamada : Neste caso, use Observable.Create para conectar seu retorno de chamada para chamar OnNext / OnError / OnComplete no observador.

  • A fonte de dados é uma chamada que bloqueia até que novos dados estejam disponíveis (por exemplo, algumas operações de leitura de soquete síncrono): Neste caso, você pode usar Observable.Create para envolver o código imperativo que lê do soquete e publica no Observer.OnNext quando os dados são lidos. Isso pode ser semelhante ao que você está fazendo com o Assunto.

Usar Observable.Create versus criar uma classe que gerencie um Assunto é bastante equivalente a usar a palavra-chave yield versus criar uma classe inteira que implementa IEnumerator. Claro, você pode escrever um IEnumerator para ser um cidadão tão limpo e bom quanto o código de produção, mas qual deles é melhor encapsulado e tem um design mais organizado? O mesmo é verdadeiro para Observable.Create vs gerenciar assuntos.

Observable.Create oferece um padrão limpo para configuração preguiçosa e desmontagem limpa. Como você consegue isso com uma classe envolvendo um assunto? Você precisa de algum tipo de método Start ... como saber quando chamá-lo? Ou você sempre começa, mesmo quando ninguém está ouvindo? E quando terminar, como você interromperá a leitura do soquete / pesquisa do banco de dados, etc? Você precisa ter algum tipo de método Stop e ainda ter acesso não apenas ao IObservable ao qual está inscrito, mas à classe que criou o Assunto em primeiro lugar.

Com Observable.Create, está tudo reunido em um só lugar. O corpo de Observable.Create não é executado até que alguém se inscreva, portanto, se ninguém se inscrever, você nunca usará seu recurso. E Observable.Create retorna um Disposable que pode encerrar de forma limpa seu recurso / callbacks, etc - isso é chamado quando o Observer cancela a inscrição. O tempo de vida dos recursos que você está usando para gerar o Observable está nitidamente ligado ao tempo de vida do próprio Observable.

Niall Connaughton
fonte
1
Explicação muito clara de Observable.Create. Obrigado!
Evan Moran
1
Ainda tenho casos em que uso um assunto, em que um objeto corretor expõe o observável (digamos que seja apenas uma propriedade mutável). Diferentes componentes chamarão o corretor informando quando essa propriedade muda (com uma chamada de método), e esse método faz um OnNext. Os consumidores se inscrevem. Acho que usaria um BehaviorSubject nesse caso, isso é apropriado?
Frank Schwieterman
1
Depende da situação. Um bom design Rx tende a transformar o sistema em uma arquitetura assíncrona / reativa. Pode ser difícil integrar de forma limpa pequenos componentes de código reativo com um sistema de design imperativo. A solução band-aid é usar assuntos para transformar ações imperativas (chamadas de função, conjuntos de propriedades) em eventos observáveis. Então você acaba com pequenos bolsões de código reativo e nenhum "aha!" momento. Alterar o design para modelar o fluxo de dados e reagir a ele geralmente oferece um design melhor, mas é uma mudança abrangente e requer uma mudança de mentalidade e adesão da equipe.
Niall Connaughton
1
Eu diria aqui (como Rx inexperiente) que: Usando assuntos você pode entrar no mundo de Rx dentro de um aplicativo imperativo crescido e lentamente transformá-lo. Também para ganhar as primeiras experiências ... e certamente depois mudar o seu código para como deveria ser desde o início (risos). Mas, para começar, acho que vale a pena usar temas.
Robetto
9

O texto do bloco citado explica muito por que você não deveria usar Subject<T> , mas para simplificar, você está combinando as funções de observador e observável, enquanto injeta algum tipo de estado no meio (seja encapsulando ou estendendo).

É aqui que você encontra problemas; essas responsabilidades devem ser separadas e distintas umas das outras.

Dito isto, na sua específica caso , recomendo que você divida suas preocupações em partes menores.

Primeiro, você tem seu tópico que está quente e sempre monitorando o hardware em busca de sinais para levantar notificações. Como você faria isso normalmente? Eventos . Então, vamos começar com isso.

Vamos definir o EventArgsque seu evento irá disparar.

// The event args that has the information.
public class BaseFrameEventArgs : EventArgs
{
    public BaseFrameEventArgs(IBaseFrame baseFrame)
    {
        // Validate parameters.
        if (baseFrame == null) throw new ArgumentNullException("IBaseFrame");

        // Set values.
        BaseFrame = baseFrame;
    }

    // Poor man's immutability.
    public IBaseFrame BaseFrame { get; private set; }
}

Agora, a classe que irá disparar o evento. Note, esta poderia ser uma classe estática (desde que você sempre têm um fio condutor monitorando o buffer de hardware), ou algo que você chamar sob demanda que subscreve que . Você terá que modificar isso conforme apropriado.

public class BaseFrameMonitor
{
    // You want to make this access thread safe
    public event EventHandler<BaseFrameEventArgs> HardwareEvent;

    public BaseFrameMonitor()
    {
        // Create/subscribe to your thread that
        // drains hardware signals.
    }
}

Então, agora você tem uma classe que expõe um evento. Os observáveis ​​funcionam bem com eventos. Tanto é assim que há suporte de primeira classe para converter fluxos de eventos (pense em um fluxo de eventos como vários disparos de um evento) em IObservable<T>implementações se você seguir o padrão de evento padrão, por meio do método estáticoFromEventPattern na Observableclasse .

Com a fonte de seus eventos e o FromEventPatternmétodo, podemos criar um IObservable<EventPattern<BaseFrameEventArgs>>facilmente (a EventPattern<TEventArgs>classe incorpora o que você veria em um evento .NET, notavelmente, uma instância derivada de EventArgse um objeto que representa o remetente), assim:

// The event source.
// Or you might not need this if your class is static and exposes
// the event as a static event.
var source = new BaseFrameMonitor();

// Create the observable.  It's going to be hot
// as the events are hot.
IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable.
    FromEventPattern<BaseFrameEventArgs>(
        h => source.HardwareEvent += h,
        h => source.HardwareEvent -= h);

Claro, você quer um IObservable<IBaseFrame>, mas é fácil, usando o Selectmétodo de extensão na Observableclasse para criar uma projeção (assim como faria no LINQ, e podemos resumir tudo isso em um método fácil de usar):

public IObservable<IBaseFrame> CreateHardwareObservable()
{
    // The event source.
    // Or you might not need this if your class is static and exposes
    // the event as a static event.
    var source = new BaseFrameMonitor();

    // Create the observable.  It's going to be hot
    // as the events are hot.
    IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable.
        FromEventPattern<BaseFrameEventArgs>(
            h => source.HardwareEvent += h,
            h => source.HardwareEvent -= h);

    // Return the observable, but projected.
    return observable.Select(i => i.EventArgs.BaseFrame);
}
casperOne
fonte
7
Obrigado por sua resposta @casperOne, esta foi minha abordagem inicial, mas parecia "errado" adicionar um evento apenas para que eu pudesse envolvê-lo com Rx. Atualmente, eu uso delegados (e sim, eu sei que isso é exatamente o que um evento é!) Para se ajustar ao código usado para carregar e salvar a configuração, isso deve ser capaz de reconstruir os pipelines de componentes e o sistema de delegados me deu mais flexibilidade. Rx está me dando dor de cabeça nessa área agora, mas o poder de tudo o mais no framework está fazendo com que a resolução do problema de configuração valha a pena.
Anthony
@Anthony Se você conseguir fazer seu exemplo de código funcionar, ótimo, mas, como comentei, não faz sentido. Quanto a sentir-se "errado", não sei por que subdividir as coisas em partes lógicas parece "errado", mas você não forneceu detalhes suficientes em sua postagem original para indicar a melhor forma de traduzir isso IObservable<T>como nenhuma informação sobre como você ' re sinalizando com que a informação é dada.
casperOne
@casperOne Em sua opinião, o uso de Assuntos seria apropriado para um Message Bus / Event Aggregator?
kitsune de
1
@kitsune Não, não vejo porque fariam isso. Se você está pensando em "otimização", precisa se perguntar se esse é ou não o problema, você mediu Rx como a causa do problema?
casperOne
2
Eu concordo aqui com casperOne que dividir as preocupações é uma boa ideia. Gostaria de salientar que, se você usar o padrão Hardware to Event to Rx, perderá a semântica de erro. Quaisquer conexões ou sessões perdidas, etc., não serão expostas ao consumidor. Agora o consumidor não pode decidir se deseja tentar novamente, desconectar, assinar outra sequência ou outra coisa.
Lee Campbell,
0

É ruim generalizar que os assuntos não são bons para usar em uma interface pública. Embora seja certamente verdade, que essa não é a forma como uma abordagem de programação reativa deveria parecer, é definitivamente uma boa opção de melhoria / refatoração para seu código clássico.

Se você tem uma propriedade normal com um acessador de conjunto público e deseja notificar sobre as alterações, não há nada contra substituí-la por um BehaviorSubject. O INPC ou outros eventos adicionais não são tão claros e pessoalmente me desanimam. Para este propósito, você pode e deve usar BehaviorSubjects como propriedades públicas em vez de propriedades normais e dispensar INPC ou outros eventos.

Além disso, a interface de assunto torna os usuários de sua interface mais cientes sobre a funcionalidade de suas propriedades e são mais propensos a se inscrever em vez de apenas obter o valor.

É melhor usar se você quiser que outras pessoas ouçam / assinem as alterações de uma propriedade.

Felix Keil
fonte