AtomicInteger recordNumber = new AtomicInteger();
Files.lines(inputFile.toPath(), StandardCharsets.UTF_8)
.map(record -> new Record(recordNumber.incrementAndGet(), record))
.parallel()
.filter(record -> doSomeOperation())
.findFirst()
Quando escrevi isso, assumi que os threads seriam gerados apenas na chamada do mapa, pois o paralelo é colocado após o mapa. Mas algumas linhas no arquivo estavam recebendo números de registros diferentes para cada execução.
Li a documentação oficial do fluxo Java e alguns sites para entender como os fluxos funcionam sob o capô.
Algumas questões:
O fluxo paralelo de Java funciona com base no SplitIterator , que é implementado por todas as coleções como ArrayList, LinkedList etc. Quando construímos um fluxo paralelo a partir dessas coleções, o iterador de divisão correspondente será usado para dividir e iterar a coleção. Isso explica por que o paralelismo aconteceu no nível da fonte de entrada original (linhas de arquivo) e não no resultado do mapa (ou seja, Record pojo). Meu entendimento está correto?
No meu caso, a entrada é um fluxo de E / S de arquivo. Qual iterador dividido será usado?
Não importa onde colocamos
parallel()
no pipeline. A fonte de entrada original será sempre dividida e as operações intermediárias restantes serão aplicadas.Nesse caso, o Java não deve permitir que os usuários coloquem operações paralelas em qualquer lugar do pipeline, exceto na fonte original. Porque está dando um entendimento errado para quem não sabe como o java stream funciona internamente. Eu sei que a
parallel()
operação teria sido definida para o tipo de objeto Stream e, portanto, está funcionando dessa maneira. Mas, é melhor fornecer uma solução alternativa.No trecho de código acima, estou tentando adicionar um número de linha a cada registro no arquivo de entrada e, portanto, ele deve ser solicitado. No entanto, quero aplicar
doSomeOperation()
em paralelo, pois é uma lógica de peso pesado. A única maneira de conseguir é escrever meu próprio iterador dividido personalizado. Existe alguma outra maneira?
fonte
parallel()
nada mais é do que uma solicitação geral de modificador aplicada ao objeto de fluxo subjacente. Lembre-se de que existe apenas um fluxo de origem se você não aplicar operações finais ao canal, ou seja, desde que nada seja "executado". Dito isto, você está basicamente questionando as opções de design do Java. Qual é a opinião e não podemos realmente ajudar com isso.Stream
diretamente na interface e, devido à boa cascata, todas as operações são devolvidasStream
. Imagine que alguém queira lhe dar uma,Stream
mas já aplicou algumas operações semelhantesmap
a ela. Você, como usuário, ainda deseja decidir se deve executá-lo em paralelo ou não. Portanto, você deve poder ligarparallel()
ainda, embora o fluxo já exista.flatMap
ou se executasse métodos inseguros ou similares.Path
estiver no sistema de arquivos local e você estiver usando um JDK recente, o spliterator terá uma capacidade de processamento paralelo melhor do que múltiplos em lotes de 1024. Mas a divisão equilibrada pode até ser contraproducente em algunsfindFirst
cenários ...Respostas:
O fluxo inteiro é paralelo ou seqüencial. Não selecionamos um subconjunto de operações para executar sequencialmente ou em paralelo.
Como você mencionou, os fluxos paralelos usam iteradores divididos. Claramente, isso é para particionar os dados antes que as operações comecem a ser executadas.
Olhando a fonte, vejo que ela usa
java.nio.file.FileChannelLinesSpliterator
Direita. Você pode até ligar
parallel()
esequential()
várias vezes. O último invocado vencerá. Quando chamamosparallel()
, definimos isso para o fluxo retornado; e, como mencionado acima, todas as operações são executadas sequencialmente ou em paralelo.Isso se torna uma questão de opiniões. Eu acho que Zabuza dá um bom motivo para apoiar a escolha dos designers do JDK.
Isso depende das suas operações
findFirst()
é a sua operação real do terminal, você nem precisa se preocupar com a execução paralela, porque dedoSomething()
qualquer maneira não haverá muitas chamadas (findFirst()
está em curto-circuito)..parallel()
de fato, pode fazer com que mais de um elemento seja processado, enquanto quefindFirst()
em um fluxo seqüencial impediria isso.Se a operação do terminal não criar muitos dados, talvez você possa criar seus
Record
objetos usando um fluxo sequencial e processe o resultado em paralelo:Se o seu pipeline carrega muitos dados na memória (que pode ser o motivo pelo qual você está usando
Files.lines()
), talvez seja necessário um iterador dividido personalizado. Antes de ir para lá, porém, eu procurava outras opções (como salvar linhas com uma coluna de identificação - para começar - essa é apenas a minha opinião).Eu também tentaria processar registros em lotes menores, como este:
Isso é executado
doSomeOperation()
em paralelo sem carregar todos os dados na memória. Mas observe quebatchSize
será necessário pensar um pouco.fonte
Spliterator
implementação personalizada não seria mais complicada do que isso, permitindo um processamento paralelo mais eficiente ...parallelStream
operações internas possui uma sobrecarga fixa para iniciar a operação e aguardar o resultado final, enquanto se limita a um paralelismo debatchSize
. Primeiro, você precisa de um número múltiplo de núcleos de CPU atualmente disponíveis para evitar encadeamentos inativos. Então, o número deve ser alto o suficiente para compensar a sobrecarga fixa, mas quanto maior o número, maior a pausa imposta pela operação de leitura seqüencial que ocorre antes mesmo do início do processamento paralelo.Stream.generate
produz um fluxo não ordenado, que não funciona com os casos de uso pretendidos do OP, comofindFirst()
. Por outro lado, um único fluxo paralelo com um spliterator que retorna pedaços emtrySplit
trabalhos diretamente e permite que os threads de trabalho processem o próximo pedaço sem aguardar a conclusão do anterior.findFirst()
operação processe apenas um pequeno número de elementos. A primeira correspondência ainda pode ocorrer após o processamento de 90% de todos os elementos. Além disso, ao ter dez milhões de linhas, até encontrar uma correspondência após 10% ainda exige o processamento de um milhão de linhas.O design original do Stream incluía a ideia de dar suporte aos estágios subsequentes do pipeline com diferentes configurações de execução paralela, mas essa ideia foi abandonada. A API pode resultar desse momento, mas, por outro lado, um design de API que força o chamador a tomar uma única decisão inequívoca para execução paralela ou sequencial seria muito mais complicado.
O real
Spliterator
em uso porFiles.lines(…)
depende da implementação. No Java 8 (Oracle ou OpenJDK), você sempre obtém o mesmo que comBufferedReader.lines()
. Nos JDKs mais recentes, se oPath
pertence ao sistema de arquivos padrão e o charset é um dos suportados para esse recurso, você obtém um Stream com umaSpliterator
implementação dedicada , ojava.nio.file.FileChannelLinesSpliterator
. Se as pré-condições não forem atendidas, você obtém o mesmo que comBufferedReader.lines()
, que ainda é baseado em umaIterator
implementação internaBufferedReader
e via empacotadaSpliterators.spliteratorUnknownSize
.Sua tarefa específica é melhor gerenciada com um costume
Spliterator
que pode executar a numeração de linha diretamente na origem, antes do processamento paralelo, para permitir o processamento paralelo subsequente sem restrições.fonte
E a seguir é uma demonstração simples de quando a aplicação de paralelo é aplicada. A saída do peek mostra claramente a diferença entre os dois exemplos. Nota: A
map
chamada é lançada apenas para adicionar outro método antes deparallel
.fonte