Combine uma lista de observáveis ​​e espere até que tudo esteja concluído

91

TL; DR Como converter Task.whenAll(List<Task>)em RxJava?

Meu código existente usa Bolts para criar uma lista de tarefas assíncronas e espera até que todas essas tarefas sejam concluídas antes de executar outras etapas. Basicamente, ele cria um List<Task>e retorna um, Taskque é marcado como concluído quando todas as tarefas da lista são concluídas, conforme o exemplo no site da Bolts .

Estou procurando substituir Boltspor RxJavae presumo que esse método de construir uma lista de tarefas assíncronas (tamanho não conhecido com antecedência) e agrupá-las todas em uma única Observableé possível, mas não sei como.

Eu tentei olhar para merge, zip, concatetc ... mas não pode começar a trabalhar no List<Observable>que eu estaria construindo como todos eles parecem orientados para trabalhar em apenas dois Observablesde cada vez, se eu entender os documentos corretamente.

Estou tentando aprender RxJavae ainda sou muito novo nisso, então me perdoe se esta é uma pergunta óbvia ou explicada na documentação em algum lugar; Eu tentei pesquisar. Qualquer ajuda seria muito apreciada.

Craig Russell
fonte

Respostas:

73

Parece que você está procurando a operadora Zip .

Existem algumas maneiras diferentes de usá-lo, então vamos ver um exemplo. Digamos que temos alguns observáveis ​​simples de diferentes tipos:

Observable<Integer> obs1 = Observable.just(1);
Observable<String> obs2 = Observable.just("Blah");
Observable<Boolean> obs3 = Observable.just(true);

A maneira mais simples de esperar por todos eles é algo assim:

Observable.zip(obs1, obs2, obs3, (Integer i, String s, Boolean b) -> i + " " + s + " " + b)
.subscribe(str -> System.out.println(str));

Observe que na função zip, os parâmetros têm tipos concretos que correspondem aos tipos dos observáveis ​​sendo compactados.

Compactar uma lista de observáveis ​​também é possível, diretamente:

List<Observable<?>> obsList = Arrays.asList(obs1, obs2, obs3);

Observable.zip(obsList, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));

... ou agrupando a lista em Observable<Observable<?>>:

Observable<Observable<?>> obsObs = Observable.from(obsList);

Observable.zip(obsObs, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));

No entanto, em ambos os casos, a função zip só pode aceitar um único Object[]parâmetro, uma vez que os tipos de observáveis ​​na lista não são conhecidos com antecedência, bem como seu número. Isso significa que a função zip teria que verificar o número de parâmetros e convertê-los de acordo.

Independentemente disso, todos os exemplos acima acabarão por imprimir 1 Blah true

EDITAR: Ao usar o Zip, certifique-se de que o Observablescompactado emita o mesmo número de itens. Nos exemplos acima, todos os três observáveis ​​emitiram um único item. Se tivéssemos que mudá-los para algo assim:

Observable<Integer> obs1 = Observable.from(new Integer[]{1,2,3}); //Emits three items
Observable<String> obs2 = Observable.from(new String[]{"Blah","Hello"}); //Emits two items
Observable<Boolean> obs3 = Observable.from(new Boolean[]{true,true}); //Emits two items

Então, 1, Blah, Truee 2, Hello, Trueseriam os únicos itens passados ​​para a (s) função (ões) zip. O item 3nunca seria compactado, pois os outros observáveis ​​foram concluídos.

Malte
fonte
9
Isso não funcionará se uma das chamadas falhar. Nesse caso, todas as chamadas serão perdidas.
StarWind0
1
@ StarWind0 você pode pular o erro por uso onErrorResumeNext, exemplo:Observable.zip(ob1, ob2........).onErrorResumeNext(Observable.<String>empty())
vuhung3990
E se eu tiver 100 observáveis?
Krzysztof Kubicki
80

Você pode usar flatMapcaso tenha composição de tarefas dinâmicas. Algo assim:

public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks) {
    return Observable.from(tasks)
            //execute in parallel
            .flatMap(task -> task.observeOn(Schedulers.computation()))
            //wait, until all task are executed
            //be aware, all your observable should emit onComplemete event
            //otherwise you will wait forever
            .toList()
            //could implement more intelligent logic. eg. check that everything is successful
            .map(results -> true);
}

Outro bom exemplo de execução paralela

Nota: Eu realmente não sei seus requisitos para tratamento de erros. Por exemplo, o que fazer se apenas uma tarefa falhar. Acho que você deve verificar este cenário.

MyDogTom
fonte
17
Essa deve ser a resposta aceita, considerando que a pergunta afirma "quando todas as tarefas na lista forem concluídas". zipnotifica sobre a conclusão assim que uma das tarefas é concluída e, portanto, não é aplicável.
user3707125
1
@MyDogTom: Você pode atualizar a resposta com a versão da sintaxe Java7 (não a lambda)?
sanedroid
3
@PoojaGaikwad Com lambda é mais legível. Basta substituir o primeiro lambda por new Func1<Observable<Boolean>, Observable<Boolean>>()...e o segundo pornew Func1<List<Boolean>, Boolean>()
MyDogTom
@soshial RxJava 2 é a pior coisa que já aconteceu com RxJava, sim
egorikem
16

Das sugestões propostas, zip () realmente combina resultados observáveis ​​uns com os outros, que podem ou não ser o que se deseja, mas não foi feito na pergunta. Na questão, tudo o que se queria era a execução de cada uma das operações, uma por uma ou em paralelo (o que não foi especificado, mas o exemplo de Bolts vinculado era sobre execução paralela). Além disso, zip () será concluído imediatamente quando qualquer um dos observáveis ​​for concluído, portanto, viola os requisitos.

Para a execução paralela de Observables, flatMap () apresentado na outra resposta é adequado , mas merge () seria mais direto. Observe que a fusão será encerrada por erro de qualquer um dos Observáveis, se você preferir adiar a saída até que todos os observáveis ​​tenham terminado, você deve olhar para mergeDelayError () .

Para um por um, acho que o método estático Observable.concat () deve ser usado. Seu javadoc afirma assim:

concat (java.lang.Iterable> sequence) Nivela um Iterable de Observables em um Observable, um após o outro, sem intercalá-los

que soa como o que você está procurando se não quiser execução paralela.

Além disso, se você estiver interessado apenas na conclusão de sua tarefa, não em valores de retorno, você provavelmente deve procurar Completável em vez de Observável .

TLDR: para a execução de tarefas uma por uma e evento oncompletion quando forem concluídas, acho que Completable.concat () é mais adequado. Para execução paralela, Completable.merge () ou Completable.mergeDelayError () parece a solução. O primeiro irá parar imediatamente em qualquer erro em qualquer completável, o último irá executá-los todos, mesmo se um deles tiver um erro, e só então relata o erro.

eis
fonte
2

Você provavelmente olhou para o zipoperador que funciona com 2 observáveis.

Existe também o método estático Observable.zip. Ele tem um formulário que deve ser útil para você:

zip(java.lang.Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction)

Você pode verificar o javadoc para mais informações.

LordRaydenMK
fonte
2

Com Kotlin

Observable.zip(obs1, obs2, BiFunction { t1 : Boolean, t2:Boolean ->

})

É importante definir o tipo para os argumentos da função ou você terá erros de compilação

O último tipo de argumento muda com o número do argumento: BiFunction para 2 Função3 para 3 Função4 para 4 ...

Kevin ABRIOUX
fonte
1

Estou escrevendo um código de computação em Kotlin com JavaRx Observables e RxKotlin. Quero observar uma lista de observáveis ​​a serem concluídas e, entretanto, fornecer-me uma atualização com o progresso e os resultados mais recentes. No final, ele retorna o melhor resultado do cálculo. Um requisito extra era executar Observables em paralelo para usar todos os meus núcleos de cpu. Acabei com esta solução:

@Volatile var results: MutableList<CalculationResult> = mutableListOf()

fun doALotOfCalculations(listOfCalculations: List<Calculation>): Observable<Pair<String, CalculationResult>> {

    return Observable.create { subscriber ->
        Observable.concatEager(listOfCalculations.map { calculation: Calculation ->
            doCalculation(calculation).subscribeOn(Schedulers.computation()) // function doCalculation returns an Observable with only one result
        }).subscribeBy(
            onNext = {
                results.add(it)
                subscriber.onNext(Pair("A calculation is ready", it))

            },
            onComplete = {
                subscriber.onNext(Pair("Finished: ${results.size}", findBestCalculation(results)) 
                subscriber.onComplete()
            },
            onError = {
                subscriber.onError(it)
            }
        )
    }
}
Sjors
fonte
não estou familiarizado com RxKotlin ou @Volatile, mas como isso funcionaria se ele estivesse sendo chamado por vários threads ao mesmo tempo? O que aconteceria com os resultados?
eis
0

Eu tive um problema semelhante, precisei buscar itens de pesquisa na chamada de descanso enquanto também integrava sugestões salvas de um RecentSearchProvider.AUTHORITY e combinava-as em uma lista unificada. Eu estava tentando usar a solução @MyDogTom, infelizmente não há Observable.from no RxJava. Depois de alguma pesquisa, encontrei uma solução que funcionou para mim.

 fun getSearchedResultsSuggestions(context : Context, query : String) : Single<ArrayList<ArrayList<SearchItem>>>
{
    val fetchedItems = ArrayList<Observable<ArrayList<SearchItem>>>(0)
    fetchedItems.add(fetchSearchSuggestions(context,query).toObservable())
    fetchedItems.add(getSearchResults(query).toObservable())

    return Observable.fromArray(fetchedItems)
        .flatMapIterable { data->data }
        .flatMap {task -> task.observeOn(Schedulers.io())}
        .toList()
        .map { ArrayList(it) }
}

Criei um observável a partir da matriz de observáveis ​​que contém listas de sugestões e resultados da Internet dependendo da consulta. Depois disso, basta passar por cima dessas tarefas com flatMapIterable e executá-las usando flatmap, colocar os resultados em array, que podem ser buscados posteriormente em uma visualização de reciclagem.

Anton Makov
fonte
0

Se você usar o Project Reactor, poderá usar Mono.when.

Mono.when(publisher1, publisher2)
.map(i-> {
    System.out.println("everything is done!");
    return i;
}).block()
Roe
fonte