Como obter o valor atual de RxJS Subject ou Observable?

207

Eu tenho um serviço Angular 2:

import {Storage} from './storage';
import {Injectable} from 'angular2/core';
import {Subject}    from 'rxjs/Subject';

@Injectable()
export class SessionStorage extends Storage {
  private _isLoggedInSource = new Subject<boolean>();
  isLoggedIn = this._isLoggedInSource.asObservable();
  constructor() {
    super('session');
  }
  setIsLoggedIn(value: boolean) {
    this.setItem('_isLoggedIn', value, () => {
      this._isLoggedInSource.next(value);
    });
  }
}

Tudo funciona muito bem. Mas eu tenho outro componente que não precisa se inscrever, ele só precisa obter o valor atual de isLoggedIn em um determinado momento. Como posso fazer isso?

Baconbeastnz
fonte

Respostas:

341

A Subjectou Observablenão tem um valor atual. Quando um valor é emitido, ele é passado aos assinantes e aoObservable é feito com ele.

Se você deseja ter um valor atual, use o BehaviorSubjectque foi projetado exatamente para esse fim.BehaviorSubjectmantém o último valor emitido e o emite imediatamente para novos assinantes.

Também possui um método getValue()para obter o valor atual.

Günter Zöchbauer
fonte
1
Oi, é ruim, eles são a mesma coisa agora no rxjs 5. O link do código fonte acima se referia ao rxjs4.
código de schrodinger 15/06
84
Nota importante do autor do RxJS 5: Usar getValue()é uma enorme bandeira vermelha de que você está fazendo algo errado. Está lá como uma escotilha de fuga. Geralmente tudo o que você faz com o RxJS deve ser declarativo. getValue()é imperativo. Se você estiver usando getValue(), há uma chance de 99,9% de fazer algo errado ou estranho.
Ben Lesh
1
@BenLesh, e se eu tiver um BehaviorSubject<boolean>(false)e gostar de alterná-lo?
bob
3
@BenLesh getValue () é muito útil, por exemplo, executando uma ação instantânea, como onClick-> dispatchToServer (x.getValue ()), mas não o usa em cadeias observáveis.
FlavorScape 26/04/19
2
@ IngoBürk A única maneira de justificar sua sugestão é se você não sabia que foo$era uma BehaviorSubject(ou seja, é definida como Observablee talvez seja quente ou fria de uma fonte adiada). No entanto uma vez que, em seguida, ir para o uso .nextem $foosi que significa que a sua implementação depende de ele ser um BehaviorSubjectmodo que não há justificativa para não apenas usando .valuepara obter o valor atual em primeiro lugar.
22618 Simon_Weaver
145

A única maneira que você deve estar recebendo valores "fora de" um observável / Assunto é com se inscrever!

Se você estiver usando, getValue()estará fazendo algo imperativo no paradigma declarativo. Está lá como uma escotilha de fuga, mas 99,9% do tempo você NÃO deve usar getValue(). Existem algumas coisas interessantes quegetValue() farão: Irá gerar um erro se o assunto tiver sido cancelado, impedirá que você obtenha um valor se o assunto estiver morto por estar errado, etc. Mas, novamente, existe como uma fuga eclodem em circunstâncias raras.

Existem várias maneiras de obter o valor mais recente de um Assunto ou Observável da maneira "Rx-y":

  1. Usando BehaviorSubject: Mas realmente assinando . Quando você assina pela primeira vezBehaviorSubject vez, enviará de forma síncrona o valor anterior que recebeu ou foi inicializado.
  2. Usando um ReplaySubject(N): Isso armazenará em cacheN valores e reproduzi-las aos novos assinantes.
  3. A.withLatestFrom(B): Use esse operador para obter o valor mais recente de observável Bquando Aemitido. Fornecerá os dois valores em uma matriz [a, b].
  4. A.combineLatest(B): Use este operador para obter os valores mais recentes do Ae Bcada vez que quer Aou Bemite. Fornecerá os dois valores em uma matriz.
  5. shareReplay(): Faz um multicast observável por meio de um ReplaySubject , mas permite repetir o observável por erro. (Basicamente, fornece esse comportamento de armazenamento em cache promissor).
  6. publishReplay(), publishBehavior(initialValue), multicast(subject: BehaviorSubject | ReplaySubject), Etc: Outros operadores que a alavancagem BehaviorSubjecte ReplaySubject. Diferentes sabores da mesma coisa, eles basicamente fazem multicast da fonte observável, canalizando todas as notificações através de um assunto. Você precisa ligar connect()para se inscrever na fonte com o assunto.
Ben Lesh
fonte
19
você pode explicar por que usar getValue () é uma bandeira vermelha? E se eu precisar do valor atual do observável apenas uma vez, no momento em que o usuário clicar em um botão? Devo assinar o valor e cancelar a assinatura imediatamente?
Chama
5
Está tudo bem às vezes. Mas muitas vezes é um sinal de que o autor do código está fazendo algo muito imperativo (geralmente com efeitos colaterais) onde não deveria estar. Você pode fazer click$.mergeMap(() => behaviorSubject.take(1))para resolver seu problema também.
precisa
1
Ótima explicação! Na verdade, se você pode manter o Observable e usar os métodos, é uma maneira muito melhor. Em um contexto de texto datilografado com Observable usado em todos os lugares, mas apenas alguns definidos como BehaviourSubject, seria um código menos consistente. Os métodos que você propôs me permitiram manter os tipos observáveis ​​em todos os lugares.
JLavoie
2
é bom se você quiser simplesmente despachar o valor atual (como enviá-lo ao servidor ao clicar), fazer um getValue () é muito útil. mas não o use ao encadear operadores observáveis.
FlavorScape
1
Eu tenho um AuthenticationServiceque usa um BehaviourSubjectpara armazenar o estado atual conectado ( boolean trueou false). Ele expõe um isLoggedIn$observável para assinantes que desejam saber quando o estado muda. Ele também expõe uma get isLoggedIn()propriedade, que retorna o estado atual de logon chamando getValue()o subjacente BehaviourSubject- isso é usado pelo meu guarda de autenticação para verificar o estado atual. Parece-me um uso sensato getValue()...?
Dan King
13

Eu tive uma situação semelhante em que assinantes atrasados ​​assinam o Assunto após a chegada do valor.

Eu encontrei ReplaySubject, que é semelhante ao BehaviorSubject, funciona como um encanto neste caso. E aqui está um link para uma melhor explicação: http://reactivex.io/rxjs/manual/overview.html#replaysubject

Kfir Erez
fonte
1
que me ajudou em meu aplicativo Angular4 - Eu tive que mudar a assinatura do construtor componente para o ngOnInit () (tal componente é compartilhado entre as rotas aéreas), deixando apenas aqui no caso de alguém tem um problema semelhante
Luca
1
Eu tive um problema com um aplicativo Angular 5 em que estava usando um serviço para obter valores de uma API e definir variáveis ​​em diferentes componentes. Eu estava usando assuntos / observáveis, mas ele não pressionava os valores após uma alteração de rota. ReplaySubject foi uma queda no substituto do Subject e resolveu tudo.
precisa saber é o seguinte
1
Certifique-se de que você está usando ReplaySubject (1), caso contrário novos assinantes irá obter cada valor previamente emitida em sequência - isto nem sempre é óbvia em tempo de execução
Drenai
@Drenai, tanto quanto eu entendo ReplaySubject (1) se comportam da mesma como BehaviorSubject ()
Kfir Erez
Não é a mesma coisa, a grande diferença é que o ReplaySubject não emite imediatamente um valor padrão quando é inscrito se sua next()função ainda não foi invocada, enquanto o BehaviourSubject emite. O emitem imediato é muito útil para a aplicação de valores padrão para uma visão, quando o BehaviourSubject é usado como fonte de dados observáveis em um serviço, por exemplo
Drenai
5
const observable = of('response')

function hasValue(value: any) {
  return value !== null && value !== undefined;
}

function getValue<T>(observable: Observable<T>): Promise<T> {
  return observable
    .pipe(
      filter(hasValue),
      first()
    )
    .toPromise();
}

const result = await getValue(observable)
// Do the logic with the result
// .................
// .................
// .................

Você pode conferir o artigo completo sobre como implementá-lo aqui. https://www.imkrish.com/how-to-get-current-value-of-observable-in-a-clean-way/

Keerati Limkulphong
fonte
3

Encontrei o mesmo problema em componentes filhos, nos quais, inicialmente, ele teria que ter o valor atual do Assunto, depois me inscrevi no Assunto para ouvir as alterações. Eu apenas mantenho o valor atual no Serviço para que ele esteja disponível para acesso aos componentes, por exemplo:

import {Storage} from './storage';
import {Injectable} from 'angular2/core';
import {Subject}    from 'rxjs/Subject';

@Injectable()
export class SessionStorage extends Storage {

  isLoggedIn: boolean;

  private _isLoggedInSource = new Subject<boolean>();
  isLoggedIn = this._isLoggedInSource.asObservable();
  constructor() {
    super('session');
    this.currIsLoggedIn = false;
  }
  setIsLoggedIn(value: boolean) {
    this.setItem('_isLoggedIn', value, () => {
      this._isLoggedInSource.next(value);
    });
    this.isLoggedIn = value;
  }
}

Um componente que precisa do valor atual pode acessá-lo a partir do serviço, ou seja:

sessionStorage.isLoggedIn

Não tenho certeza se esta é a prática correta :)

Molp Burnbright
fonte
2
Se você precisar do valor de um observável na visualização do componente, basta usar o asyncpipe.
Ingo Bürk
2

Um semelhante à procura resposta foi downvoted. Mas acho que posso justificar o que estou sugerindo aqui para casos limitados.


Embora seja verdade que um observável não tenha um valor atual , muitas vezes ele terá um valor imediatamente disponível . Por exemplo, nos repositórios redux / flux / akita, você pode solicitar dados de um repositório central, com base em vários observáveis ​​e esse valor geralmente estará disponível imediatamente.

Se for esse o caso, quando você subscribeo valor voltará imediatamente.

Então, digamos que você tenha telefonado para um serviço e, ao concluir, deseja obter o valor mais recente de algo da sua loja, que potencialmente não seja emitido :

Você pode tentar fazer isso (e manter o máximo possível as coisas 'dentro dos tubos'):

 serviceCallResponse$.pipe(withLatestFrom(store$.select(x => x.customer)))
                     .subscribe(([ serviceCallResponse, customer] => {

                        // we have serviceCallResponse and customer 
                     });

O problema é que ele será bloqueado até que o observável secundário emita um valor, o que potencialmente nunca poderia ser.

Recentemente, me vi precisando avaliar um observável apenas se um valor estivesse disponível imediatamente e, mais importante, eu precisava ser capaz de detectar se não estava. Acabei fazendo isso:

 serviceCallResponse$.pipe()
                     .subscribe(serviceCallResponse => {

                        // immediately try to subscribe to get the 'available' value
                        // note: immediately unsubscribe afterward to 'cancel' if needed
                        let customer = undefined;

                        // whatever the secondary observable is
                        const secondary$ = store$.select(x => x.customer);

                        // subscribe to it, and assign to closure scope
                        sub = secondary$.pipe(take(1)).subscribe(_customer => customer = _customer);
                        sub.unsubscribe();

                        // if there's a delay or customer isn't available the value won't have been set before we get here
                        if (customer === undefined) 
                        {
                           // handle, or ignore as needed
                           return throwError('Customer was not immediately available');
                        }
                     });

Observe que, para todas as opções acima, estou usando subscribepara obter o valor (como @Ben discute). Não .valueestou usando uma propriedade, mesmo que eu tivesse um BehaviorSubject.

Simon_Weaver
fonte
1
Btw isso funciona porque, por padrão, 'agenda' usa o segmento atual.
Simon_Weaver 27/01/19
1

Embora possa parecer um exagero, essa é apenas outra solução "possível" para manter o tipo Observable e reduzir o padrão ...

Você sempre pode criar um getter de extensão para obter o valor atual de um Observável.

Para fazer isso, você precisaria estender a Observable<T>interface em um global.d.tsarquivo de declaração de digitação. Em seguida, implemente o getter de extensão em um observable.extension.tsarquivo e, finalmente, inclua os tipos e o arquivo de extensão no seu aplicativo.

Você pode consultar esta Resposta do StackOverflow para saber como incluir as extensões no seu aplicativo Angular.

// global.d.ts
declare module 'rxjs' {
  interface Observable<T> {
    /**
     * _Extension Method_ - Returns current value of an Observable.
     * Value is retrieved using _first()_ operator to avoid the need to unsubscribe.
     */
    value: Observable<T>;
  }
}

// observable.extension.ts
Object.defineProperty(Observable.prototype, 'value', {
  get <T>(this: Observable<T>): Observable<T> {
    return this.pipe(
      filter(value => value !== null && value !== undefined),
      first());
  },
});

// using the extension getter example
this.myObservable$.value
  .subscribe(value => {
    // whatever code you need...
  });
j3ff
fonte
0

Você pode armazenar o último valor emitido separadamente do Observável. Em seguida, leia-o quando necessário.

let lastValue: number;

const subscription = new Service().start();
subscription
    .subscribe((data) => {
        lastValue = data;
    }
);
Slawa
fonte
1
Não é uma abordagem reativa armazenar algumas coisas fora do observável. Em vez disso, você deve ter o máximo de dados possível fluindo dentro dos fluxos observáveis.
precisa saber é o seguinte
0

A melhor maneira de fazer isso é usar Behaviur Subject, aqui está um exemplo:

var sub = new rxjs.BehaviorSubject([0, 1])
sub.next([2, 3])
setTimeout(() => {sub.next([4, 5])}, 1500)
sub.subscribe(a => console.log(a)) //2, 3 (current value) -> wait 2 sec -> 4, 5
yaya
fonte
0

Uma assinatura pode ser criada e após a destruição do primeiro item emitido. Pipe é uma função que usa um Observable como entrada e retorna outro Observable como saída, sem modificar o primeiro observável. Angular 8.1.0. Pacotes: "rxjs": "6.5.3","rxjs-observable": "0.0.7"

  ngOnInit() {

    ...

    // If loading with previously saved value
    if (this.controlValue) {

      // Take says once you have 1, then close the subscription
      this.selectList.pipe(take(1)).subscribe(x => {
        let opt = x.find(y => y.value === this.controlValue);
        this.updateValue(opt);
      });

    }
  }
SushiGuy
fonte