O IObserver do .NET era destinado à assinatura de vários IObservables?

9

Existem interfaces IObservable e IObserver no .NET (também aqui e aqui ). Curiosamente, a implementação concreta do IObserver não mantém uma referência direta ao IObservable. Não sabe em quem está inscrito. Só pode invocar o cancelador de assinatura. "Por favor, puxe o alfinete para cancelar a inscrição."

edit: O cancelador de assinatura implementa o IDisposable. Eu acho que esse esquema foi empregado para evitar o problema do ouvinte caducado .

Duas coisas ainda não estão totalmente claras para mim.

  1. A classe Unsubscriber interna fornece o comportamento de assinar e esquecer? Quem (e quando exatamente) chama IDisposable.Dispose()o Cancelador de assinatura? O coletor de lixo (GC) não é determinístico.
    [Aviso: no geral, passei mais tempo com C e C ++ do que com C #.]
  2. O que deve acontecer se eu quiser inscrever um observador K em um L1 observável e o observador já estiver inscrito em algum outro L2 observável?

    K.Subscribe(L1);
    K.Subscribe(L2);
    K.Unsubscribe();
    L1.PublishObservation(1003);
    L2.PublishObservation(1004);
    

    Quando executei esse código de teste no exemplo do MSDN, o observador permaneceu inscrito no L1. Isso seria peculiar no desenvolvimento real. Potencialmente, existem 3 caminhos para melhorar isso:

    • Se o observador já tiver uma instância de cancelamento de inscrição (ou seja, já está inscrita), será silenciosamente cancelada a inscrição no provedor original antes de assinar uma nova. Essa abordagem oculta o fato de não ser mais assinada pelo provedor original, o que pode se tornar uma surpresa mais tarde.
    • Se o observador já tiver uma instância de cancelamento de inscrição, será lançada uma exceção. Um código de chamada bem comportado deve cancelar a assinatura do observador explicitamente.
    • O Observer assina vários fornecedores. Essa é a opção mais intrigante, mas isso pode ser implementado com IObservable e IObserver? Vamos ver. É possível para o observador manter uma lista de objetos de cancelamento de assinatura: um para cada fonte. Infelizmente, IObserver.OnComplete()não fornece uma referência ao provedor que a enviou. Portanto, a implementação do IObserver com vários provedores não seria capaz de determinar de qual deles cancelar a inscrição.
  3. O IObserver do .NET pretendia se inscrever em vários IObservables?
    A definição de livro didático do padrão de observador exige que um observador possa se inscrever em vários provedores? Ou é opcional e depende da implementação?

Nick Alexeev
fonte

Respostas:

5

As duas interfaces são na verdade parte das Extensões Reativas (Rx, abreviado); você deve usar essa biblioteca praticamente sempre que quiser usá-las.

As interfaces estão tecnicamente em mscrolib, não em nenhum dos assemblies Rx. Eu acho que isso é para facilitar a interoperabilidade: dessa forma, bibliotecas como o TPL Dataflow podem fornecer membros que trabalham com essas interfaces , sem fazer referência ao Rx.

Se você usar Rxs Subjectcomo sua implementação IObservable, Subscriberetornará um IDisposableque pode ser usado para cancelar a inscrição:

var observable = new Subject<int>();

var unsubscriber =
    observable.Subscribe(Observer.Create<int>(i => Console.WriteLine("1: {0}", i)));
observable.Subscribe(Observer.Create<int>(i => Console.WriteLine("2: {0}", i)));

unsubscriber.Dispose();

observable.OnNext(1003);
observable.OnNext(1004);
svick
fonte
5

Apenas para esclarecer algumas coisas que estão bem documentadas nas Diretrizes de Design oficiais da Rx e em detalhes no meu site IntroToRx.com :

  • Você não depende do GC para limpar suas assinaturas. Coberto em detalhes aqui
  • Não existe Unsubscribemétodo. Você assina uma sequência observável e recebe uma assinatura . Você pode descartar essa assinatura indicando que não deseja mais chamar seus retornos de chamada.
  • Uma sequência observável não pode ser concluída mais de uma vez (consulte a seção 4 das Diretrizes de design da Rx).
  • Existem inúmeras maneiras de consumir várias seqüências observáveis. Também há muitas informações sobre isso no Reactivex.io e novamente no IntroToRx.

Para ser específico e responder diretamente à pergunta original, seu uso está voltado para a frente. Você não insere muitas seqüências observáveis ​​em um único observador. Você compõe seqüências observáveis ​​em uma única sequência observável. Você então se inscreve nessa sequência única.

Ao invés de

K.Subscribe(L1);
K.Subscribe(L2);
K.Unsubscribe();
L1.PublishObservation(1003);
L2.PublishObservation(1004);

Que é apenas pseudo-código e não funcionaria na implementação do .NET do Rx, faça o seguinte:

var source1 = new Subject<int>(); //was L1
var source2 = new Subject<int>(); //was L2

var subscription = source1
    .Merge(source2)
    .Subscribe(value=>Console.WriteLine("OnNext({0})", value));


source1.OnNext(1003);
source2.OnNext(1004);

subscription.Dispose();

Agora, isso não se encaixa exatamente na pergunta inicial, mas não sei o que K.Unsubscribe()deveria fazer (cancelar a assinatura de todas, a última ou a primeira assinatura ?!)

Lee Campbell
fonte
Posso simplesmente envolver o objeto de assinatura em um bloco "using"?
Robert Oschler
11
Nesse caso síncrono, você pode, no entanto, o Rx deve ser assíncrono. No caso assíncrono, você normalmente não pode usar o usingbloco. O custo para uma declaração de inscrição deve ser praticamente zero, assim você iria NETER o bloco usando, subscrever, deixar o bloco usando (assim unsubscribe), tornando o código bastante inútil
Lee Campbell
3

Você está certo. O exemplo funciona mal para vários IObservables.

Eu acho que OnComplete () não fornece uma referência de volta porque eles não querem que o IObservable tenha que mantê-lo por perto. Se eu estivesse escrevendo, provavelmente daria suporte a várias assinaturas fazendo com que o Subscribe recebesse um identificador como um segundo parâmetro, que é passado de volta à chamada OnComplete (). Então você poderia dizer

K.Subscribe(L1,"L1")
K.Subscribe(L2,"L2")
K.Unsubscribe("L1")

Tal como está, parece que o .NET IObserver não é adequado para vários observadores. Mas suponho que seu objeto principal (LocationReporter no exemplo) possa ter

public Dictionary<String,IObserver> Observers;

e isso permitiria que você apoiasse

K.Subscribe(L1,"L1")
K.Subscribe(L2,"L2")
K.Unsubscribe("L1")

também.

Suponho que a Microsoft possa argumentar que, portanto, não há necessidade de oferecer suporte direto a vários IObservables nas interfaces.

psr
fonte
Eu também estava pensando que a implementação observável pode ter uma lista de observadores. Eu também notei que IObserver.OnComplete()não identifica de quem é a ligação. Se o observador estiver inscrito em mais de um observável, não saberá de quem cancelar a inscrição. Anticlimático. Eu me pergunto, o .NET tem uma interface melhor para o padrão de observador?
22680 Nick Ellenev
Se você quiser ter uma referência a algo, você deve realmente usar uma referência, não uma string.
Svick
Essa resposta me ajudou com um bug da vida real. Eu estava usando Observable.Create()para criar um observável e encadeando vários observáveis ​​de origem nele Subscribe(). Eu passei inadvertidamente um observável concluído em um caminho de código. Isso completou meu observável recém-criado, mesmo que as outras fontes não estivessem completas. Tomou-me idades para descobrir o que eu precisava fazer - interruptor Observable.Empty()para Observable.Never().
Olly #
0

Eu sei que é muito tarde para a festa, mas ...

As interfaces I Observable<T>e nãoIObserver<T> fazem parte do Rx ... são tipos principais ... mas o Rx faz uso extensivo delas.

Você é livre para ter quantos (ou poucos) observadores quiser. Se você antecipar vários observadores, é responsabilidade do observável encaminhar as OnNext()chamadas para os observadores apropriados para cada evento observado. O observável pode precisar de uma lista ou um dicionário, como você sugere.

Existem bons casos para permitir apenas um - e bons casos para permitir muitos. Por exemplo, em uma implementação do CQRS / ES, você pode impor um único manipulador de comando por tipo de comando em um barramento de comando, enquanto pode notificar várias transformações do lado de leitura para um determinado tipo de evento no armazenamento de eventos.

Como afirmado em outras respostas, não há Unsubscribe. Descartando o que você recebe quando Subscribegeralmente faz o trabalho sujo. O observador, ou um agente do mesmo, é responsável por manter o token até que ele não queira mais receber notificações adicionais . (pergunta 1)

Então, no seu exemplo:

K.Subscribe(L1);
K.Subscribe(L2);
K.Unsubscribe();
L1.PublishObservation(1003);
L2.PublishObservation(1004);

... seria mais como:

using ( var l1Token = K.Subscribe( L1 ) )
{
  using ( var l2Token = K.Subscribe( L2 );
  {
    L1.PublishObservation( 1003 );
    L2.PublishObservation( 1004 );
  } //--> effectively unsubscribing to L2 here

  L2.PublishObservation( 1005 );
}

... onde K ouviria 1003 e 1004, mas não 1005.

Para mim, isso ainda parece engraçado porque, nominalmente, as assinaturas são coisas de vida longa ... geralmente pela duração do programa. Eles não são diferentes nesse aspecto dos eventos .Net normais.

Em muitos exemplos que já vi, o Disposetoken funciona para remover o observador da lista de observadores do observável. Prefiro que o token não tenha tanto conhecimento ... e, por isso, generalizei meus tokens de assinatura para chamar apenas um lambda passado (com informações de identificação capturadas no momento da assinatura:

public class SubscriptionToken<T>: IDisposable
{
  private readonly Action unsubscribe;

  private SubscriptionToken( ) { }
  public SubscriptionToken( Action unsubscribe )
  {
    this.unsubscribe = unsubscribe;
  }

  public void Dispose( )
  {
    unsubscribe( );
  }
}

... e o observável pode instalar o comportamento de cancelamento de assinatura durante a assinatura:

IDisposable Subscribe<T>( IObserver<T> observer )
{
  var subscriberId = Guid.NewGuid( );
  subscribers.Add( subscriberId, observer );

  return new SubscriptionToken<T>
  (
    ( ) =>
    subscribers.Remove( subscriberId );
  );
}

Se o seu observador estiver capturando eventos de vários observáveis, convém garantir que haja algum tipo de informação de correlação nos próprios eventos ... como os eventos .Net fazem com o sender. Cabe a você se isso importa ou não. Não está cozido, como você corretamente argumentou. (Questão 3)

Argila
fonte