Eu gostaria de duplicar um fluxo do Java 8 para poder lidar com ele duas vezes. Eu posso collect
como uma lista e obter novos fluxos a partir disso;
// doSomething() returns a stream
List<A> thing = doSomething().collect(toList());
thing.stream()... // do stuff
thing.stream()... // do other stuff
Mas acho que deveria haver uma maneira mais eficiente / elegante.
Existe uma maneira de copiar o fluxo sem transformá-lo em uma coleção?
Na verdade, estou trabalhando com um fluxo de Either
s, então, quero processar a projeção esquerda de uma maneira antes de passar para a projeção direita e lidar com essa outra maneira. Mais ou menos assim (com o qual até agora sou forçado a usar o toList
truque).
List<Either<Pair<A, Throwable>, A>> results = doSomething().collect(toList());
Stream<Pair<A, Throwable>> failures = results.stream().flatMap(either -> either.left());
failures.forEach(failure -> ... );
Stream<A> successes = results.stream().flatMap(either -> either.right());
successes.forEach(success -> ... );
java
lambda
java-8
java-stream
Toby
fonte
fonte
Respostas:
Eu acho que sua suposição sobre eficiência é meio atrasada. Você obtém esse enorme retorno de eficiência se quiser usar os dados apenas uma vez, porque não precisa armazená-los, e os fluxos oferecem otimizações poderosas de "fusão de loop" que permitem que você flua os dados inteiros de maneira eficiente pelo pipeline.
Se você deseja reutilizar os mesmos dados, por definição, é necessário gerá-los duas vezes (deterministicamente) ou armazená-los. Se já estiver em uma coleção, ótimo; iterá-lo duas vezes é barato.
Nós experimentamos o design com "fluxos bifurcados". O que descobrimos foi que apoiar isso tinha custos reais; sobrecarregou o caso comum (use uma vez) às custas do caso incomum. O grande problema foi lidar com "o que acontece quando os dois pipelines não consomem dados na mesma taxa". Agora você voltou ao buffer de qualquer maneira. Essa era uma característica que claramente não carregava seu peso.
Se você deseja operar os mesmos dados repetidamente, armazene-os ou estruture suas operações como Consumidores e faça o seguinte:
Você também pode procurar na biblioteca RxJava, pois seu modelo de processamento se presta melhor a esse tipo de "fluxo de bifurcação".
fonte
toList
) para poder processá-los (oEither
caso sendo o exemplo)?Você pode usar uma variável local com a
Supplier
para configurar partes comuns do pipeline de fluxo.De http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/ :
fonte
Supplier
se o modeloStream
for construído de maneira "cara", você paga esse custo por cada chamada paraSupplier.get()
. ou seja, se uma consulta ao banco de dados ... essa consulta é feita a cada vezSet<Integer>
usocollect(Collectors.toSet())
... e fazer algumas operações nisso. Eu queriamax()
e se um valor específico estivesse definido como duas operações ...filter(d -> d == -1).count() == 1;
Use a
Supplier
para produzir o fluxo para cada operação de encerramento.Sempre que você precisar de um fluxo dessa coleção, use
streamSupplier.get()
para obter um novo fluxo.Exemplos:
streamSupplier.get().anyMatch(predicate);
streamSupplier.get().allMatch(predicate2);
fonte
Implementamos um
duplicate()
método para fluxos no jOOλ , uma biblioteca de código aberto que criamos para aprimorar os testes de integração para o jOOQ . Basicamente, você pode escrever:Internamente, há um buffer armazenando todos os valores que foram consumidos de um fluxo, mas não do outro. Isso é provavelmente o mais eficiente possível, se seus dois fluxos forem consumidos aproximadamente na mesma taxa e se você puder viver com a falta de segurança de threads .
Veja como o algoritmo funciona:
Mais código fonte aqui
Tuple2
é, provavelmente, como o seuPair
tipo, enquantoSeq
éStream
com algumas melhorias.fonte
Tuple2<Seq<A>>, Seq<A>> t = duplicate(stream); long count = t.collect(counting()); List<A> list = t.collect(toList());
, é melhor usarTuple2<Long, List<A>> t = stream.collect(Tuple.collectors(counting(), toList()));
. O uso deCollectors.mapping/reducing
um pode expressar outras operações de fluxo como coletores e elementos de processo de maneira bastante diferente, criando uma tupla resultante única. Portanto, em geral, você pode fazer muitas coisas consumindo o fluxo uma vez sem duplicação e será compatível com paralelos.offer()
/poll()
API, mas umArrayDeque
pode fazer o mesmo.Você pode criar um fluxo de executáveis (por exemplo):
Onde
failure
esuccess
as operações a serem aplicadas. No entanto, isso criará alguns objetos temporários e pode não ser mais eficiente do que iniciar uma coleção e transmiti-la / iterá-la duas vezes.fonte
Outra maneira de lidar com os elementos várias vezes é usar Stream.peek (Consumer) :
peek(Consumer)
pode ser encadeado quantas vezes for necessário.fonte
O cyclops-react , uma biblioteca na qual contribuo, possui um método estático que permite duplicar um fluxo (e retorna uma tupla de fluxos).
Veja os comentários, há uma penalidade de desempenho que será incorrida ao usar duplicado em um fluxo existente. Uma alternativa mais eficiente seria usar o Streamable: -
Há também uma classe Streamable (lenta) que pode ser construída a partir de um Stream, Iterable ou Array e reproduzida várias vezes.
AsStreamable.synchronizedFromStream (stream) - pode ser usado para criar um Streamable que preencherá lentamente sua coleção de backups, de maneira que possa ser compartilhada entre os threads. Streamable.fromStream (stream) não sofrerá nenhuma sobrecarga de sincronização.
fonte
List<Integer> list = stream.collect(Collectors.toList()); streams = new Tuple2<>(list.stream(), list.stream())
(como sugere a OP). Além disso, divulgue explicitamente na resposta que você é o autor do cyclop-streams. Leia isto .Para esse problema específico, você também pode usar o particionamento. Algo como
fonte
Podemos fazer uso do Stream Builder no momento da leitura ou iteração de um fluxo. Aqui está o documento de Stream Builder .
https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.Builder.html
Caso de uso
Digamos que temos fluxo de funcionários e precisamos usá-lo para gravar dados de funcionários no arquivo do Excel e, em seguida, atualizar a coleção / tabela de funcionários [Este é apenas um caso de uso para mostrar o uso do Stream Builder]:
fonte
Eu tive um problema semelhante e pude pensar em três estruturas intermediárias diferentes para criar uma cópia do fluxo: a
List
, uma matriz e aStream.Builder
. Eu escrevi um pequeno programa de benchmark, que sugeria que, do ponto de vista do desempenho,List
era cerca de 30% mais lento que os outros dois, que eram bastante semelhantes.A única desvantagem da conversão para uma matriz é que é complicado se o seu tipo de elemento for um tipo genérico (que no meu caso era); portanto, eu prefiro usar a
Stream.Builder
.Acabei escrevendo uma pequena função que cria um
Collector
:Posso então fazer uma cópia de qualquer fluxo
str
, fazendo ostr.collect(copyCollector())
que parece bastante com o uso idiomático dos fluxos.fonte