RxJS: Como eu "manualmente" atualizava um Observable?

139

Acho que devo estar entendendo mal algo fundamental, porque, na minha opinião, esse deve ser o caso mais básico para um observável, mas, durante toda a minha vida, não consigo descobrir como fazê-lo a partir dos documentos.

Basicamente, eu quero ser capaz de fazer isso:

// create a dummy observable, which I would update manually
var eventObservable = rx.Observable.create(function(observer){});
var observer = eventObservable.subscribe(
   function(x){
     console.log('next: ' + x);
   }
...
var my_function = function(){
  eventObservable.push('foo'); 
  //'push' adds an event to the datastream, the observer gets it and prints 
  // next: foo
}

Mas não consegui encontrar um método parecido push. Estou usando isso para um manipulador de cliques, e sei que eles têm Observable.fromEventpara isso, mas estou tentando usá-lo com o React e preferiria simplesmente atualizar o fluxo de dados em um retorno de chamada, em vez de usar um método completamente diferente sistema de manipulação de eventos. Então, basicamente, eu quero isso:

$( "#target" ).click(function(e) {
  eventObservable.push(e.target.text()); 
});

O mais próximo que cheguei foi do uso observer.onNext('foo'), mas isso não parecia realmente funcionar e isso é chamado pelo observador, o que não parece certo. O observador deve reagir ao fluxo de dados, não alterá-lo, certo?

Eu simplesmente não entendo a relação observador / observável?

LiamD
fonte
Dê uma olhada nisto para esclarecer sua ideia (a introdução à Programação Reativa que você está perdendo): gist.github.com/staltz/868e7e9bc2a7b8c1f754 . Aqui também há vários
user3743222 25/15/15
Eu verifiquei o primeiro, parece um recurso sólido. O segundo é uma ótima lista, encontrei aaronstacy.com/writings/reactive-programming-and-mvc que me ajudou a descobrir o Rx.Subject, que resolve meu problema. Então obrigado! Depois de escrever um pouco mais do aplicativo, postarei minha solução, apenas quero testá-la um pouco.
LiamD 25/10
Hehe, muito obrigado por fazer esta pergunta, eu estava prestes a fazer a mesma pergunta com o mesmo exemplo de código em mente :-)
arturh

Respostas:

154

No RX, Observer e Observable são entidades distintas. Um observador assina um Observable. Um Observável emite itens para seus observadores chamando os métodos dos observadores. Se você precisar chamar os métodos observadores fora do escopo deObservable.create() poderá usar um Assunto, que é um proxy que atua como observador e Observável ao mesmo tempo.

Você pode fazer assim:

var eventStream = new Rx.Subject();

var subscription = eventStream.subscribe(
   function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });

var my_function = function() {
  eventStream.next('foo'); 
}

Você pode encontrar mais informações sobre assuntos aqui:

luisgabriel
fonte
1
Isso é exatamente o que acabei fazendo! Continuei trabalhando para encontrar uma maneira melhor de fazer o que precisava, mas essa é definitivamente uma solução viável. Vi pela primeira vez neste artigo: aaronstacy.com/writings/reactive-programming-and-mvc .
LiamD 27/10/2015
34

Eu acredito Observable.create()que não toma um observador como parâmetro de retorno de chamada, mas um emissor. Portanto, se você deseja adicionar um novo valor ao seu Observable, tente o seguinte:

var emitter;
var observable = Rx.Observable.create(e => emitter = e);
var observer = {
  next: function(next) {
    console.log(next);
  },
  error: function(error) {
    console.log(error);
  },
  complete: function() {
    console.log("done");
  }
}
observable.subscribe(observer);
emitter.next('foo');
emitter.next('bar');
emitter.next('baz');
emitter.complete();

//console output
//"foo"
//"bar"
//"baz"
//"done"

Sim Assunto facilita, fornecendo Observável e Observador no mesmo objeto, mas não é exatamente o mesmo, pois o Assunto permite que você assine vários observadores no mesmo observável quando um observável envia apenas dados para o último observador inscrito, portanto, use-o conscientemente . Aqui está um JsBin se você quiser mexer com ele.

theFreedomBanana
fonte
A possibilidade de sobrescrever a propriedade do emissor está documentada em algum lugar nos manuais do RxJS?
Tomas
Nesse caso, emitterhaverá apenas next()novos valores para o observador que assinou o último. Uma abordagem melhor seria para recolher todos emitters em uma matriz e percorrer todos eles e nexto valor em cada um deles
Eric Gopak