Os fluxos do Java 8 são semelhantes aos observáveis do RxJava?
Definição de fluxo do Java 8:
As classes no novo
java.util.stream
pacote fornecem uma API de fluxo para suportar operações de estilo funcional em fluxos de elementos.
java-8
java-stream
rx-java
observable
rahulrv
fonte
fonte
Respostas:
TL; DR : todas as bibliotecas de processamento de sequência / fluxo estão oferecendo API muito semelhante para a construção de pipeline. As diferenças estão na API para lidar com multiencadeamento e composição de pipelines.
O RxJava é bem diferente do Stream. De todas as coisas do JDK, o mais próximo de rx.Observable talvez seja a combinação
java.util.stream.CollectorStream + CompletableFuture (que custa um custo para lidar com a camada extra de mônada, ou seja, ter que lidar com a conversão entreStream<CompletableFuture<T>>
eCompletableFuture<Stream<T>>
).Existem diferenças significativas entre Observable e Stream:
Stream#parallel()
divide a sequência em partiçõesObservable#subscribeOn()
eObservable#observeOn()
não; é difícil imitar oStream#parallel()
comportamento com o Observable, ele já teve um.parallel()
método, mas esse método causou tanta confusão que o.parallel()
suporte foi movido para o repositório separado no github, RxJavaParallel. Mais detalhes estão em outra resposta .Stream#parallel()
não permite especificar um pool de threads a ser usado, ao contrário da maioria dos métodos RxJava que aceitam o Agendador opcional. Como todas as instâncias de fluxo em uma JVM usam o mesmo conjunto de junções de bifurcação, a adição.parallel()
pode afetar acidentalmente o comportamento em outro módulo do seu programaObservable#interval()
,Observable#window()
como muitas outras; isso ocorre principalmente porque os Streams são baseados em pull e o upstream não tem controle sobre quando emitir o próximo elemento downstreamtakeWhile()
,takeUntil()
); a solução alternativaStream#anyMatch()
é limitada: é uma operação terminal, portanto você não pode usá-la mais de uma vez por fluxoStreams são difíceis de construir por si mesmo, o Observable pode ser construído de várias maneiras.EDIT: Como observado nos comentários, existem maneiras de construir o Stream. No entanto, como não há curto-circuito não terminal, você não pode, por exemplo, gerar facilmente o Stream of lines in file (o JDK fornece arquivos # lines e BufferedReader # lines fora da caixa, e outros cenários semelhantes podem ser gerenciados através da construção do Stream do Iterator).Observable#using()
); você pode agrupar o fluxo de E / S ou o mutex com ele e garantir que o usuário não esqueça de liberar o recurso - ele será descartado automaticamente no término da assinatura; O fluxo possui umonClose(Runnable)
método, mas você deve chamá-lo manualmente ou por meio de tentativa com recursos. Por exemplo. você deve ter em mente que os arquivos # lines () devem ser colocados no bloco try-with-resources.Resumo: o RxJava difere significativamente do Streams. Alternativas reais do RxJava são outras implementações do ReactiveStreams , por exemplo, parte relevante do Akka.
Update . Existe um truque para usar o pool de junção de forquilha não padrão
Stream#parallel
, consulte Pool de encadeamentos customizados no fluxo paralelo do Java 8Update . Tudo acima é baseado na experiência com o RxJava 1.x. Agora que o RxJava 2.x está aqui , esta resposta pode estar desatualizada.
fonte
Stream.generate()
e passar sua própriaSupplier<U>
implementação, apenas um método simples a partir do qual você fornece o próximo item no fluxo. Existem muitos outros métodos. Para construir facilmente uma sequênciaStream
que depende dos valores anteriores, você pode usar ointerate()
método, cadaCollection
um possui umstream()
método eStream.of()
constrói aStream
partir de uma variável ou matriz. FinalmenteStreamSupport
, suporta a criação de fluxo mais avançada usando spliterators ou para tipos primitivos de fluxo.takeWhile()
,takeUntil()
);" - JDK9 tem estes, creio eu, em TakeWhile () e dropWhile ()O Java 8 Stream e o RxJava são bem parecidos. Eles têm operadores parecidos (filtro, mapa, flatMap ...), mas não são criados para o mesmo uso.
Você pode executar tarefas assíncronas usando RxJava.
Com o Java 8 stream, você percorrerá itens da sua coleção.
Você pode fazer praticamente a mesma coisa no RxJava (itens transversais de uma coleção), mas, como o RxJava é focado em tarefas simultâneas, ..., ele usa sincronização, trava, ... Portanto, a mesma tarefa usando o RxJava pode ser mais lenta que com fluxo Java 8.
O RxJava pode ser comparado
CompletableFuture
, mas isso pode computar mais do que apenas um valor.fonte
parallelStream
suporta sincronização semelhante de traversals simples / mapas / filtragem etc ..Existem algumas diferenças técnicas e conceituais, por exemplo, os fluxos Java 8 são sequências síncronas de valores de uso único, baseadas em pull, enquanto os Observáveis do RxJava são sequências de valores re-observáveis, baseadas em push-pull adaptável e potencialmente assíncronas. O RxJava é voltado para Java 6+ e funciona no Android também.
fonte
Os fluxos do Java 8 são baseados em pull. Você itera sobre um fluxo do Java 8 que consome cada item. E poderia ser um fluxo interminável.
Por
Observable
padrão, o RXJava é baseado em push. Você assina um Observable e será notificado quando o próximo item chegar (onNext
), ou quando o fluxo for concluído (onCompleted
), ou quando ocorrer um erro (onError
). Porque comObservable
você recebeonNext
,onCompleted
,onError
eventos, você pode fazer algumas funções poderosas como a combinação de diferentesObservable
s para uma nova (zip
,merge
,concat
). Outras coisas que você pode fazer é armazenar em cache, limitar e ... E usa mais ou menos a mesma API em diferentes idiomas (RxJava, RX em C #, RxJS, ...)Por padrão, o RxJava é de thread único. A menos que você comece a usar agendadores, tudo acontecerá no mesmo segmento.
fonte
As respostas existentes são abrangentes e corretas, mas falta um exemplo claro para iniciantes. Permitam-me colocar alguns termos concretos por trás, como "push / pull-based" e "re-observable". Nota : Eu odeio o termo
Observable
(é um fluxo pelo amor de Deus), então, simplesmente me refiro aos fluxos J8 vs RX.Considere uma lista de números inteiros,
Um J8 Stream é um utilitário para modificar a coleção. Por exemplo, mesmo dígitos podem ser extraídos como,
Este é basicamente o mapa, o filtro, a redução , o Python , uma adição muito boa (e muito atrasada) ao Java. Mas e se os dígitos não fossem coletados antes do tempo - e se os dígitos estivessem sendo transmitidos enquanto o aplicativo estava em execução - poderíamos filtrar os pares em tempo real.
Imagine que um processo de encadeamento separado esteja produzindo números inteiros em momentos aleatórios enquanto o aplicativo estiver em execução (
---
indica tempo)No RX,
even
pode reagir a cada novo dígito e aplicar o filtro em tempo realNão há necessidade de armazenar listas de entrada e saída. Se você deseja uma lista de saída, também não há problema que possa ser estriado. De fato, tudo é um fluxo.
É por isso que termos como "sem estado" e "funcional" estão mais associados ao RX
fonte
O RxJava também está intimamente relacionado à iniciativa de fluxos reativos e se considera uma implementação simples da API de fluxos reativos (por exemplo, em comparação com a implementação de fluxos Akka ). A principal diferença é que os fluxos reativos foram projetados para lidar com a contrapressão, mas se você der uma olhada na página de fluxos reativos, terá uma idéia. Eles descrevem seus objetivos muito bem e os fluxos também estão intimamente relacionados ao manifesto reativo .
Os fluxos do Java 8 são praticamente a implementação de uma coleção ilimitada, muito semelhante ao Scala Stream ou ao Clojure lazy seq .
fonte
O Java 8 Streams permite o processamento de coleções realmente grandes de maneira eficiente, enquanto aproveita arquiteturas multicore. Por outro lado, o RxJava é thread único por padrão (sem Agendadores). Portanto, o RxJava não aproveitará as máquinas com vários núcleos, a menos que você mesmo codifique essa lógica.
fonte