TL; DR
Como converter Task.whenAll(List<Task>)
em RxJava
?
Meu código existente usa Bolts para criar uma lista de tarefas assíncronas e espera até que todas essas tarefas sejam concluídas antes de executar outras etapas. Basicamente, ele cria um List<Task>
e retorna um, Task
que é marcado como concluído quando todas as tarefas da lista são concluídas, conforme o exemplo no site da Bolts .
Estou procurando substituir Bolts
por RxJava
e presumo que esse método de construir uma lista de tarefas assíncronas (tamanho não conhecido com antecedência) e agrupá-las todas em uma única Observable
é possível, mas não sei como.
Eu tentei olhar para merge
, zip
, concat
etc ... mas não pode começar a trabalhar no List<Observable>
que eu estaria construindo como todos eles parecem orientados para trabalhar em apenas dois Observables
de cada vez, se eu entender os documentos corretamente.
Estou tentando aprender RxJava
e ainda sou muito novo nisso, então me perdoe se esta é uma pergunta óbvia ou explicada na documentação em algum lugar; Eu tentei pesquisar. Qualquer ajuda seria muito apreciada.
fonte
onErrorResumeNext
, exemplo:Observable.zip(ob1, ob2........).onErrorResumeNext(Observable.<String>empty())
Você pode usar
flatMap
caso tenha composição de tarefas dinâmicas. Algo assim:public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks) { return Observable.from(tasks) //execute in parallel .flatMap(task -> task.observeOn(Schedulers.computation())) //wait, until all task are executed //be aware, all your observable should emit onComplemete event //otherwise you will wait forever .toList() //could implement more intelligent logic. eg. check that everything is successful .map(results -> true); }
Outro bom exemplo de execução paralela
Nota: Eu realmente não sei seus requisitos para tratamento de erros. Por exemplo, o que fazer se apenas uma tarefa falhar. Acho que você deve verificar este cenário.
fonte
zip
notifica sobre a conclusão assim que uma das tarefas é concluída e, portanto, não é aplicável.new Func1<Observable<Boolean>, Observable<Boolean>>()...
e o segundo pornew Func1<List<Boolean>, Boolean>()
Das sugestões propostas, zip () realmente combina resultados observáveis uns com os outros, que podem ou não ser o que se deseja, mas não foi feito na pergunta. Na questão, tudo o que se queria era a execução de cada uma das operações, uma por uma ou em paralelo (o que não foi especificado, mas o exemplo de Bolts vinculado era sobre execução paralela). Além disso, zip () será concluído imediatamente quando qualquer um dos observáveis for concluído, portanto, viola os requisitos.
Para a execução paralela de Observables, flatMap () apresentado na outra resposta é adequado , mas merge () seria mais direto. Observe que a fusão será encerrada por erro de qualquer um dos Observáveis, se você preferir adiar a saída até que todos os observáveis tenham terminado, você deve olhar para mergeDelayError () .
Para um por um, acho que o método estático Observable.concat () deve ser usado. Seu javadoc afirma assim:
que soa como o que você está procurando se não quiser execução paralela.
Além disso, se você estiver interessado apenas na conclusão de sua tarefa, não em valores de retorno, você provavelmente deve procurar Completável em vez de Observável .
TLDR: para a execução de tarefas uma por uma e evento oncompletion quando forem concluídas, acho que Completable.concat () é mais adequado. Para execução paralela, Completable.merge () ou Completable.mergeDelayError () parece a solução. O primeiro irá parar imediatamente em qualquer erro em qualquer completável, o último irá executá-los todos, mesmo se um deles tiver um erro, e só então relata o erro.
fonte
Você provavelmente olhou para o
zip
operador que funciona com 2 observáveis.Existe também o método estático
Observable.zip
. Ele tem um formulário que deve ser útil para você:Você pode verificar o javadoc para mais informações.
fonte
Com Kotlin
É importante definir o tipo para os argumentos da função ou você terá erros de compilação
O último tipo de argumento muda com o número do argumento: BiFunction para 2 Função3 para 3 Função4 para 4 ...
fonte
Estou escrevendo um código de computação em Kotlin com JavaRx Observables e RxKotlin. Quero observar uma lista de observáveis a serem concluídas e, entretanto, fornecer-me uma atualização com o progresso e os resultados mais recentes. No final, ele retorna o melhor resultado do cálculo. Um requisito extra era executar Observables em paralelo para usar todos os meus núcleos de cpu. Acabei com esta solução:
@Volatile var results: MutableList<CalculationResult> = mutableListOf() fun doALotOfCalculations(listOfCalculations: List<Calculation>): Observable<Pair<String, CalculationResult>> { return Observable.create { subscriber -> Observable.concatEager(listOfCalculations.map { calculation: Calculation -> doCalculation(calculation).subscribeOn(Schedulers.computation()) // function doCalculation returns an Observable with only one result }).subscribeBy( onNext = { results.add(it) subscriber.onNext(Pair("A calculation is ready", it)) }, onComplete = { subscriber.onNext(Pair("Finished: ${results.size}", findBestCalculation(results)) subscriber.onComplete() }, onError = { subscriber.onError(it) } ) } }
fonte
@Volatile
, mas como isso funcionaria se ele estivesse sendo chamado por vários threads ao mesmo tempo? O que aconteceria com os resultados?Eu tive um problema semelhante, precisei buscar itens de pesquisa na chamada de descanso enquanto também integrava sugestões salvas de um RecentSearchProvider.AUTHORITY e combinava-as em uma lista unificada. Eu estava tentando usar a solução @MyDogTom, infelizmente não há Observable.from no RxJava. Depois de alguma pesquisa, encontrei uma solução que funcionou para mim.
fun getSearchedResultsSuggestions(context : Context, query : String) : Single<ArrayList<ArrayList<SearchItem>>> { val fetchedItems = ArrayList<Observable<ArrayList<SearchItem>>>(0) fetchedItems.add(fetchSearchSuggestions(context,query).toObservable()) fetchedItems.add(getSearchResults(query).toObservable()) return Observable.fromArray(fetchedItems) .flatMapIterable { data->data } .flatMap {task -> task.observeOn(Schedulers.io())} .toList() .map { ArrayList(it) } }
Criei um observável a partir da matriz de observáveis que contém listas de sugestões e resultados da Internet dependendo da consulta. Depois disso, basta passar por cima dessas tarefas com flatMapIterable e executá-las usando flatmap, colocar os resultados em array, que podem ser buscados posteriormente em uma visualização de reciclagem.
fonte
Se você usar o Project Reactor, poderá usar
Mono.when
.Mono.when(publisher1, publisher2) .map(i-> { System.out.println("everything is done!"); return i; }).block()
fonte