Digamos que tenho um Observable
, assim:
var one = someObservable.take(1);
one.subscribe(function(){ /* do something */ });
Então, eu tenho um segundo Observable
:
var two = someOtherObservable.take(1);
Agora, eu quero subscribe()
a two
, mas eu quero ter certeza de que one
tenha concluído antes do two
assinante é acionado.
Que tipo de método de buffer posso usar two
para fazer o segundo esperar até que o primeiro seja concluído?
Suponho que pretendo fazer uma pausa two
até one
terminar.
javascript
observable
rxjs
Stephen
fonte
fonte
Respostas:
Algumas maneiras que posso pensar
import {take, publish} from 'rxjs/operators' import {concat} from 'rxjs' //Method one var one = someObservable.pipe(take(1)); var two = someOtherObservable.pipe(take(1)); concat(one, two).subscribe(function() {/*do something */}); //Method two, if they need to be separate for some reason var one = someObservable.pipe(take(1)); var two = someOtherObservable.pipe(take(1), publish()); two.subscribe(function(){/*do something */}); one.subscribe(function(){/*do something */}, null, two.connect.bind(two));
fonte
pause
e emresume
vez depublish
econnect
, mas o exemplo dois é essencialmente o caminho que fiz.one
) resolva antes do segundo (two
) dentro da função subscribe ()?Observable.forkJoin()
? Veja este link learnrxjs.io/operators/combination/forkjoin.htmlforkJoin
assina simultaneamente.Se você quiser ter certeza de que a ordem de execução é mantida, você pode usar flatMap como o exemplo a seguir
const first = Rx.Observable.of(1).delay(1000).do(i => console.log(i)); const second = Rx.Observable.of(11).delay(500).do(i => console.log(i)); const third = Rx.Observable.of(111).do(i => console.log(i)); first .flatMap(() => second) .flatMap(() => third) .subscribe(()=> console.log('finished'));
O resultado seria:
"1" "11" "111" "finished"
fonte
skipUntil () com last ()
skipUntil: ignora itens emitidos até que outro observável tenha emitido
último: emite o último valor de uma sequência (ou seja, espere até que seja concluído e emita)
Observe que qualquer coisa emitida pelo observável passado para
skipUntil
cancelará o salto, e é por isso que precisamos adicionarlast()
- para esperar que o fluxo seja concluído.Oficial: https://rxjs-dev.firebaseapp.com/api/operators/skipUntil
Possível problema: observe que,
last()
por si só, ocorrerá um erro se nada for emitido. Olast()
operador tem umdefault
parâmetro, mas apenas quando usado em conjunto com um predicado. Acho que se esta situação é um problema para você (sesequence2$
pode ser concluída sem emitir), então um destes deve funcionar (atualmente não testado):main$.skipUntil(sequence2$.pipe(defaultIfEmpty(undefined), last())) main$.skipUntil(sequence2$.pipe(last(), catchError(() => of(undefined))
Observe que
undefined
é um item válido para ser emitido, mas na verdade pode ser qualquer valor. Observe também que este é o tubo conectadosequence2$
e não omain$
tubo.fonte
Aqui está outra possibilidade, aproveitando o seletor de resultados de switchMap
var one$ = someObservable.take(1); var two$ = someOtherObservable.take(1); two$.switchMap( /** Wait for first Observable */ () => one$, /** Only return the value we're actually interested in */ (value2, value1) => value2 ) .subscribe((value2) => { /* do something */ });
Como o seletor de resultados do switchMap foi depreciado, aqui está uma versão atualizada
const one$ = someObservable.pipe(take(1)); const two$ = someOtherObservable.pipe( take(1), switchMap(value2 => one$.map(_ => value2)) ); two$.subscribe(value2 => { /* do something */ });
fonte
Esta é uma maneira reutilizável de fazer isso (é datilografado, mas você pode adaptá-lo para js):
export function waitFor<T>(signal: Observable<any>) { return (source: Observable<T>) => new Observable<T>(observer => signal.pipe(first()) .subscribe(_ => source.subscribe(observer) ) ); }
e você pode usá-lo como qualquer operador:
var two = someOtherObservable.pipe(waitFor(one), take(1));
É basicamente um operador que adia a assinatura na fonte observável até que o sinal observável emita o primeiro evento.
fonte
Se o segundo observável estiver quente , há outra maneira de pausar / retomar :
var pauser = new Rx.Subject(); var source1 = Rx.Observable.interval(1000).take(1); /* create source and pause */ var source2 = Rx.Observable.interval(1000).pausable(pauser); source1.doOnCompleted(function () { /* resume paused source2 */ pauser.onNext(true); }).subscribe(function(){ // do something }); source2.subscribe(function(){ // start to recieve data });
Além disso, você pode usar a versão com buffer pausableBuffered para manter os dados durante a pausa.
fonte
Aqui está outra, mas me sinto mais direta e intuitiva (ou pelo menos natural se você está acostumado com Promessas), abordagem. Basicamente, você cria um Observable usando
Observable.create()
para embrulharone
etwo
como um único Observable. Isso é muito semelhante a comoPromise.all()
pode funcionar.var first = someObservable.take(1); var second = Observable.create((observer) => { return first.subscribe( function onNext(value) { /* do something with value like: */ // observer.next(value); }, function onError(error) { observer.error(error); }, function onComplete() { someOtherObservable.take(1).subscribe( function onNext(value) { observer.next(value); }, function onError(error) { observer.error(error); }, function onComplete() { observer.complete(); } ); } ); });
Então, o que está acontecendo aqui? Primeiro, criamos um novo Observable. A função passada
Observable.create()
, apropriadamente nomeadaonSubscription
, é passada ao observador (construída a partir dos parâmetros que você passasubscribe()
), que é semelhanteresolve
ereject
combinada em um único objeto ao criar uma nova promessa. É assim que fazemos a mágica funcionar.Em
onSubscription
, assinamos o primeiro Observable (no exemplo acima, isso foi chamadoone
). Como lidamosnext
eerror
depende de você, mas o padrão fornecido em meu exemplo deve ser apropriado em geral. No entanto, quando recebermos ocomplete
evento, o que significa queone
agora está concluído, podemos nos inscrever no próximo Observable; assim, disparando o segundo Observável após a conclusão do primeiro.O exemplo de observador fornecido para o segundo Observable é bastante simples. Basicamente,
second
agora atua como você esperariatwo
agir no OP. Mais especificamente,second
emitirá o primeiro e apenas o primeiro valor emitido porsomeOtherObservable
( por causa detake(1)
) e, em seguida, será concluído, assumindo que não haja erro.Exemplo
Aqui está um exemplo funcional completo que você pode copiar / colar se quiser ver meu exemplo funcionando na vida real:
var someObservable = Observable.from([1, 2, 3, 4, 5]); var someOtherObservable = Observable.from([6, 7, 8, 9]); var first = someObservable.take(1); var second = Observable.create((observer) => { return first.subscribe( function onNext(value) { /* do something with value like: */ observer.next(value); }, function onError(error) { observer.error(error); }, function onComplete() { someOtherObservable.take(1).subscribe( function onNext(value) { observer.next(value); }, function onError(error) { observer.error(error); }, function onComplete() { observer.complete(); } ); } ); }).subscribe( function onNext(value) { console.log(value); }, function onError(error) { console.error(error); }, function onComplete() { console.log("Done!"); } );
Se você observar o console, o exemplo acima imprimirá:
fonte
Este é um operador personalizado escrito com TypeScript que espera por um sinal antes de emitir os resultados:
export function waitFor<T>( signal$: Observable<any> ) { return (source$: Observable<T>) => new Observable<T>(observer => { // combineLatest emits the first value only when // both source and signal emitted at least once combineLatest([ source$, signal$.pipe( first(), ), ]) .subscribe(([v]) => observer.next(v)); }); }
Você pode usá-lo assim:
two.pipe(waitFor(one)) .subscribe(value => ...);
fonte
bem, eu sei que isso é muito antigo, mas acho que o que você precisa é:
var one = someObservable.take(1); var two = someOtherObservable.pipe( concatMap((twoRes) => one.pipe(mapTo(twoRes))), take(1) ).subscribe((twoRes) => { // one is completed and we get two's subscription. })
fonte
Você pode usar o resultado emitido de Observable anterior graças ao operador mergeMap (ou seu alias flatMap ) como este:
const one = Observable.of('https://api.github.com/users'); const two = (c) => ajax(c);//ajax from Rxjs/dom library one.mergeMap(two).subscribe(c => console.log(c))
fonte