Estou tentando entender por que o seguinte programa Java fornece um OutOfMemoryError
, enquanto o programa correspondente sem .parallel()
.
System.out.println(Stream
.iterate(1, i -> i+1)
.parallel()
.flatMap(n -> Stream.iterate(n, i -> i+n))
.mapToInt(Integer::intValue)
.limit(100_000_000)
.sum()
);
Eu tenho duas perguntas:
Qual é o resultado pretendido deste programa?
Sem
.parallel()
parece que isso simplesmente gera, osum(1+2+3+...)
que significa que simplesmente "fica preso" no primeiro fluxo no flatMap, o que faz sentido.Com o paralelo, não sei se há um comportamento esperado, mas meu palpite seria que, de alguma forma, intercalou os primeiros
n
ou mais fluxos, onden
está o número de trabalhadores paralelos. Também pode ser um pouco diferente com base no comportamento de fragmentação / buffer.O que faz com que fique sem memória? Estou tentando entender especificamente como esses fluxos são implementados sob o capô.
Suponho que algo bloqueie o fluxo, para que ele nunca termine e possa se livrar dos valores gerados, mas não sei em que ordem as coisas são avaliadas e onde ocorre o buffer.
Editar: Caso seja relevante, estou usando o Java 11.
Editt 2: Aparentemente, a mesma coisa acontece até para o programa simples IntStream.iterate(1,i->i+1).limit(1000_000_000).parallel().sum()
, por isso pode ter a ver com a preguiça de, em limit
vez de flatMap
.
fonte
Respostas:
Você diz " mas não sei exatamente em que ordem as coisas são avaliadas e onde ocorre o armazenamento em buffer ", que é exatamente o que são os fluxos paralelos. A ordem da avaliação não é especificada.
Um aspecto crítico do seu exemplo é o
.limit(100_000_000)
. Isso implica que a implementação não pode somar valores arbitrários, mas deve somar os primeiros 100.000.000 de números. Observe que na implementação de referência,.unordered().limit(100_000_000)
não altera o resultado, o que indica que não há implementação especial para o caso não ordenado, mas esse é um detalhe da implementação.Agora, quando os encadeamentos do trabalhador processam os elementos, eles não podem apenas resumir, pois precisam saber quais elementos podem consumir, o que depende de quantos elementos estão precedendo sua carga de trabalho específica. Como esse fluxo não conhece os tamanhos, isso só pode ser conhecido quando os elementos do prefixo foram processados, o que nunca acontece para fluxos infinitos. Portanto, os encadeamentos de trabalho mantêm o buffer no momento, essas informações ficam disponíveis.
Em princípio, quando um segmento de trabalho sabe que processa o bloco de trabalho mais à esquerda, ele pode somar os elementos imediatamente, contá-los e sinalizar o fim ao atingir o limite. Portanto, o fluxo pode terminar, mas isso depende de muitos fatores.
No seu caso, um cenário plausível é que os outros threads de trabalho são mais rápidos na alocação de buffers do que o trabalho mais à esquerda está contando. Nesse cenário, alterações sutis no tempo podem fazer com que o fluxo retorne ocasionalmente com um valor.
Quando diminuímos a velocidade de todos os threads de trabalho, exceto aquele que processa a parte mais à esquerda, podemos fazer o fluxo terminar (pelo menos na maioria das execuções):
¹ Estou seguindo uma sugestão de Stuart Marks para usar a ordem da esquerda para a direita ao falar sobre a ordem de encontro e não a ordem de processamento.
fonte
Files.lines(…)
? Foi melhorado significativamente no Java 9.BufferedReader.lines()
em determinadas circunstâncias (não no sistema de arquivos padrão, em um conjunto de caracteres especial ou no tamanho maior queInteger.MAX_FILES
). Se um deles se aplicar, uma solução personalizada pode ajudar. Isso valeria a pena um novo Q & A ...Integer.MAX_VALUE
, claro ...Meu melhor palpite é que a adição
parallel()
muda o comportamento interno doflatMap()
que problemas já vinha sendo avaliada preguiçosamente antes .O
OutOfMemoryError
erro que você está recebendo foi relatado em [JDK-8202307] Obtendo um java.lang.OutOfMemoryError: espaço de heap Java ao chamar Stream.iterator (). Next () em um fluxo que usa um fluxo infinito / muito grande no flatMap . Se você olhar para o ticket, é mais ou menos o mesmo rastreamento de pilha que você está recebendo. O ticket foi fechado porque não será corrigido pelo seguinte motivo:fonte
OOME é causado não pelo fluxo ser infinito, mas pelo fato de não ser .
Ou seja, se você comentar
.limit(...)
, ele nunca ficará sem memória - mas é claro que também nunca terminará.Depois de dividido, o fluxo só pode acompanhar o número de elementos se eles estiverem acumulados em cada thread (parece que o acumulador real é
Spliterators$ArraySpliterator#array
).Parece que você pode reproduzi-lo sem
flatMap
, basta executar o seguinte com-Xmx128m
:No entanto, depois de comentar o
limit()
, ele deve funcionar bem até você decidir poupar seu laptop.Além dos detalhes reais da implementação, aqui está o que eu acho que está acontecendo:
Com
limit
, osum
redutor deseja que os primeiros elementos X sejam somados, para que nenhum thread possa emitir somas parciais. Cada "fatia" (thread) precisará acumular elementos e passá-los. Sem limite, não há tal restrição, de modo que cada "fatia" apenas calcula a soma parcial dos elementos que obtém (para sempre), assumindo que, eventualmente, emitirá o resultado.fonte
parallel()
usaráForkJoinPool
internamente para obter paralelismo. OSpliterator
será usado para atribuir trabalho a cadaForkJoin
tarefa, acho que podemos chamar a unidade de trabalho aqui como "dividida".Integer.sum()
, usado peloIntStream.sum
redutor. Você verá que a versão sem limite chama essa função o tempo todo, enquanto a versão limitada nunca é chamada antes do OOM.