Fluxos Java infinitos paralelos ficam sem memória

16

Estou tentando entender por que o seguinte programa Java fornece um OutOfMemoryError, enquanto o programa correspondente sem .parallel().

System.out.println(Stream
    .iterate(1, i -> i+1)
    .parallel()
    .flatMap(n -> Stream.iterate(n, i -> i+n))
    .mapToInt(Integer::intValue)
    .limit(100_000_000)
    .sum()
);

Eu tenho duas perguntas:

  1. Qual é o resultado pretendido deste programa?

    Sem .parallel()parece que isso simplesmente gera, o sum(1+2+3+...)que significa que simplesmente "fica preso" no primeiro fluxo no flatMap, o que faz sentido.

    Com o paralelo, não sei se há um comportamento esperado, mas meu palpite seria que, de alguma forma, intercalou os primeiros nou mais fluxos, onde nestá o número de trabalhadores paralelos. Também pode ser um pouco diferente com base no comportamento de fragmentação / buffer.

  2. O que faz com que fique sem memória? Estou tentando entender especificamente como esses fluxos são implementados sob o capô.

    Suponho que algo bloqueie o fluxo, para que ele nunca termine e possa se livrar dos valores gerados, mas não sei em que ordem as coisas são avaliadas e onde ocorre o buffer.

Editar: Caso seja relevante, estou usando o Java 11.

Editt 2: Aparentemente, a mesma coisa acontece até para o programa simples IntStream.iterate(1,i->i+1).limit(1000_000_000).parallel().sum(), por isso pode ter a ver com a preguiça de, em limitvez de flatMap.

Thomas Ahle
fonte
parallel () usa internamente o ForkJoinPool. Eu acho que o ForkJoin Framework está em Java a partir do Java 7
aravind

Respostas:

9

Você diz " mas não sei exatamente em que ordem as coisas são avaliadas e onde ocorre o armazenamento em buffer ", que é exatamente o que são os fluxos paralelos. A ordem da avaliação não é especificada.

Um aspecto crítico do seu exemplo é o .limit(100_000_000). Isso implica que a implementação não pode somar valores arbitrários, mas deve somar os primeiros 100.000.000 de números. Observe que na implementação de referência,.unordered().limit(100_000_000) não altera o resultado, o que indica que não há implementação especial para o caso não ordenado, mas esse é um detalhe da implementação.

Agora, quando os encadeamentos do trabalhador processam os elementos, eles não podem apenas resumir, pois precisam saber quais elementos podem consumir, o que depende de quantos elementos estão precedendo sua carga de trabalho específica. Como esse fluxo não conhece os tamanhos, isso só pode ser conhecido quando os elementos do prefixo foram processados, o que nunca acontece para fluxos infinitos. Portanto, os encadeamentos de trabalho mantêm o buffer no momento, essas informações ficam disponíveis.

Em princípio, quando um segmento de trabalho sabe que processa o bloco de trabalho mais à esquerda, ele pode somar os elementos imediatamente, contá-los e sinalizar o fim ao atingir o limite. Portanto, o fluxo pode terminar, mas isso depende de muitos fatores.

No seu caso, um cenário plausível é que os outros threads de trabalho são mais rápidos na alocação de buffers do que o trabalho mais à esquerda está contando. Nesse cenário, alterações sutis no tempo podem fazer com que o fluxo retorne ocasionalmente com um valor.

Quando diminuímos a velocidade de todos os threads de trabalho, exceto aquele que processa a parte mais à esquerda, podemos fazer o fluxo terminar (pelo menos na maioria das execuções):

System.out.println(IntStream
    .iterate(1, i -> i+1)
    .parallel()
    .peek(i -> { if(i != 1) LockSupport.parkNanos(1_000_000_000); })
    .flatMap(n -> IntStream.iterate(n, i -> i+n))
    .limit(100_000_000)
    .sum()
);

¹ Estou seguindo uma sugestão de Stuart Marks para usar a ordem da esquerda para a direita ao falar sobre a ordem de encontro e não a ordem de processamento.

Holger
fonte
Resposta muito boa! Gostaria de saber se existe o risco de todos os threads começarem a executar as operações flatMap, e nenhum deles ser alocado para realmente esvaziar os buffers (somatório)? No meu caso de uso real, os fluxos infinitos são arquivos muito grandes para serem armazenados na memória. Gostaria de saber como posso reescrever o fluxo para manter baixo o uso de memória?
Thomas Ahle
11
Você está usando Files.lines(…)? Foi melhorado significativamente no Java 9.
Holger
11
É o que ele faz no Java 8. Nos JREs mais recentes, ele ainda retorna BufferedReader.lines()em determinadas circunstâncias (não no sistema de arquivos padrão, em um conjunto de caracteres especial ou no tamanho maior que Integer.MAX_FILES). Se um deles se aplicar, uma solução personalizada pode ajudar. Isso valeria a pena um novo Q & A ...
Holger
11
Integer.MAX_VALUE, claro ...
Holger
11
Qual é o fluxo externo, um fluxo de arquivos? Tem um tamanho previsível?
Holger
5

Meu melhor palpite é que a adição parallel()muda o comportamento interno do flatMap()que problemas já vinha sendo avaliada preguiçosamente antes .

O OutOfMemoryErrorerro que você está recebendo foi relatado em [JDK-8202307] Obtendo um java.lang.OutOfMemoryError: espaço de heap Java ao chamar Stream.iterator (). Next () em um fluxo que usa um fluxo infinito / muito grande no flatMap . Se você olhar para o ticket, é mais ou menos o mesmo rastreamento de pilha que você está recebendo. O ticket foi fechado porque não será corrigido pelo seguinte motivo:

Os métodos iterator()e spliterator()são "hachuras de escape" a serem usadas quando não é possível usar outras operações. Eles têm algumas limitações porque transformam o que é um modelo push da implementação de fluxo em um modelo pull. Essa transição requer buffer em certos casos, como quando um elemento é (plano) mapeado para dois ou mais elementos . Isso complicaria significativamente a implementação do fluxo, provavelmente à custa de casos comuns, para suportar uma noção de contrapressão para comunicar quantos elementos extrair camadas aninhadas de produção de elementos.

Karol Dowbecki
fonte
Isto é muito interessante! Faz sentido que a transição push / pull exija buffer, o que pode esgotar a memória. No entanto, no meu caso, parece que usar apenas push deve funcionar bem e simplesmente descartar os elementos restantes conforme eles aparecem? Ou talvez você esteja dizendo que o flapmap faz com que um iterador seja criado?
Thomas Ahle
3

OOME é causado não pelo fluxo ser infinito, mas pelo fato de não ser .

Ou seja, se você comentar .limit(...), ele nunca ficará sem memória - mas é claro que também nunca terminará.

Depois de dividido, o fluxo só pode acompanhar o número de elementos se eles estiverem acumulados em cada thread (parece que o acumulador real é Spliterators$ArraySpliterator#array ).

Parece que você pode reproduzi-lo sem flatMap, basta executar o seguinte com -Xmx128m:

    System.out.println(Stream
            .iterate(1, i -> i + 1)
            .parallel()
      //    .flatMap(n -> Stream.iterate(n, i -> i+n))
            .mapToInt(Integer::intValue)
            .limit(100_000_000)
            .sum()
    );

No entanto, depois de comentar o limit() , ele deve funcionar bem até você decidir poupar seu laptop.

Além dos detalhes reais da implementação, aqui está o que eu acho que está acontecendo:

Com limit, o sumredutor deseja que os primeiros elementos X sejam somados, para que nenhum thread possa emitir somas parciais. Cada "fatia" (thread) precisará acumular elementos e passá-los. Sem limite, não há tal restrição, de modo que cada "fatia" apenas calcula a soma parcial dos elementos que obtém (para sempre), assumindo que, eventualmente, emitirá o resultado.

Costi Ciudatu
fonte
O que você quer dizer com "uma vez dividido"? O limite o divide de alguma forma?
Thomas Ahle 31/01
O @ThomasAhle parallel()usará ForkJoinPoolinternamente para obter paralelismo. O Spliteratorserá usado para atribuir trabalho a cada ForkJointarefa, acho que podemos chamar a unidade de trabalho aqui como "dividida".
Karol Dowbecki 31/01
Mas por que isso acontece apenas com limite?
Thomas Ahle 31/01
@ThomasAhle Editei a resposta com meus dois centavos.
Costi Ciudatu 31/01
11
@ThomasAhle define um ponto de interrupção Integer.sum(), usado pelo IntStream.sumredutor. Você verá que a versão sem limite chama essa função o tempo todo, enquanto a versão limitada nunca é chamada antes do OOM.
Costi Ciudatu 31/01