É possível especificar um conjunto de encadeamentos personalizado para o fluxo paralelo do Java 8 ? Não consigo encontrá-lo em lugar algum.
Imagine que eu tenho um aplicativo de servidor e gostaria de usar fluxos paralelos. Mas o aplicativo é grande e multiencadeado, então eu quero compartimentá-lo. Eu não quero uma tarefa de execução lenta em um módulo das tarefas applicationblock de outro módulo.
Se não posso usar conjuntos de encadeamentos diferentes para módulos diferentes, significa que não posso usar fluxos paralelos com segurança na maioria das situações do mundo real.
Tente o seguinte exemplo. Existem algumas tarefas intensivas da CPU executadas em threads separados. As tarefas utilizam fluxos paralelos. A primeira tarefa é interrompida, portanto, cada etapa leva 1 segundo (simulado pela suspensão do encadeamento). O problema é que outros threads ficam presos e aguardam a conclusão da tarefa quebrada. Este é um exemplo artificial, mas imagine um aplicativo de servlet e alguém enviando uma tarefa de longa execução para o pool de junção de bifurcação compartilhada.
public class ParallelTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
es.execute(() -> runTask(1000)); //incorrect task
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.shutdown();
es.awaitTermination(60, TimeUnit.SECONDS);
}
private static void runTask(int delay) {
range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
.ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
}
public static boolean isPrime(long n) {
return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
}
}
Respostas:
Na verdade, há um truque de como executar uma operação paralela em um pool de junção de forquilha específico. Se você executá-lo como uma tarefa em um pool de junção de bifurcação, ele permanece lá e não usa o comum.
O truque é baseado no ForkJoinTask.fork que especifica: "Organiza a execução assíncrona dessa tarefa no pool em que a tarefa atual está sendo executada, se aplicável, ou usando o ForkJoinPool.commonPool () se não for inForkJoinPool ()"
fonte
ForkJoinPool
ou isso é um detalhe de implementação? Um link para a documentação seria bom.ForkJoinPool
instância deve sershutdown()
quando não for mais necessária para evitar um vazamento de thread. (exemplo)Os fluxos paralelos usam o padrão
ForkJoinPool.commonPool
que, por padrão, possui menos um encadeamento como os processadores , conforme retornado porRuntime.getRuntime().availableProcessors()
(Isso significa que os fluxos paralelos usam todos os seus processadores porque também usam o encadeamento principal):Isso também significa que se você tiver aninhado fluxos paralelos ou vários fluxos paralelos iniciados simultaneamente, todos compartilharão o mesmo pool. Vantagem: você nunca usará mais do que o padrão (número de processadores disponíveis). Desvantagem: você pode não ter "todos os processadores" atribuídos a cada fluxo paralelo iniciado (se houver mais de um). (Aparentemente, você pode usar um ManagedBlocker para contornar isso.)
Para alterar a maneira como os fluxos paralelos são executados, você pode
yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get();
ouSystem.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20")
para um paralelismo de destino de 20 threads. No entanto, isso não funciona mais após o patch suportado https://bugs.openjdk.java.net/browse/JDK-8190974 .Exemplo deste último na minha máquina que possui 8 processadores. Se eu executar o seguinte programa:
A saída é:
Assim, você pode ver que o fluxo paralelo processa 8 itens por vez, ou seja, usa 8 threads. No entanto, se eu descomentar a linha comentada, a saída é:
Desta vez, o fluxo paralelo usou 20 threads e todos os 20 elementos no fluxo foram processados simultaneamente.
fonte
commonPool
verdade, tem um a menos queavailableProcessors
, resultando em paralelismo total igual aavailableProcessors
porque o encadeamento de chamada conta como um.ForkJoinTask
. Para imitarparallel()
get()
é necessário:stream.parallel().forEach(soSomething)).get();
ForkJoinPool.submit(() -> stream.forEach(...))
executará minhas ações do Stream com o dadoForkJoinPool
. Eu esperaria que todo o Stream-Action fosse executado no ForJoinPool como uma ação, mas internamente ainda usando o ForkJoinPool padrão / comum. Onde você viu que o ForkJoinPool.submit () faria o que você diz que faz?Como alternativa ao truque de acionar a computação paralela dentro de seu próprio forkJoinPool, você também pode passar esse pool para o método CompletableFuture.supplyAsync, como em:
fonte
A solução original (configurando a propriedade paralelismo comum ForkJoinPool) não funciona mais. Observando os links na resposta original, uma atualização que quebra isso foi portada novamente para Java 8. Como mencionado nos encadeamentos vinculados, não foi garantido que esta solução funcionasse para sempre. Com base nisso, a solução é a solução forkjoinpool.submit with .get discutida na resposta aceita. Acho que o backport também corrige a falta de confiabilidade dessa solução.
fonte
ForkJoinPool.commonPool().getParallelism()
no modo de depuração.unreported exception InterruptedException; must be caught or declared to be thrown
mesmo com todas ascatch
exceções no loop.Podemos alterar o paralelismo padrão usando a seguinte propriedade:
que pode ser configurado para usar mais paralelismo.
fonte
Para medir o número real de threads usados, você pode verificar
Thread.activeCount()
:Isso pode produzir em uma CPU de 4 núcleos uma saída como:
Sem
.parallel()
ele dá:fonte
Até agora, usei as soluções descritas nas respostas desta pergunta. Agora, criei uma pequena biblioteca chamada Parallel Stream Support para isso:
Mas, como o @PabloMatiasGomez apontou nos comentários, existem desvantagens no mecanismo de divisão de fluxos paralelos, que depende muito do tamanho do pool comum. Consulte O fluxo paralelo de um HashSet não é executado em paralelo .
Estou usando esta solução apenas para ter pools separados para diferentes tipos de trabalho, mas não posso definir o tamanho do pool comum como 1, mesmo que não o use.
fonte
Nota: Parece haver uma correção implementada no JDK 10 que garante que o Conjunto de Encadeamentos Customizados use o número esperado de encadeamentos.
A execução de fluxo paralelo em um ForkJoinPool personalizado deve obedecer ao paralelismo https://bugs.openjdk.java.net/browse/JDK-8190974
fonte
Tentei o ForkJoinPool personalizado da seguinte maneira para ajustar o tamanho do pool:
Aqui está a saída dizendo que o pool está usando mais threads do que o padrão 4 .
Mas, na verdade, há um estranho , quando tentei obter o mesmo resultado usando
ThreadPoolExecutor
o seguinte:mas eu falhei.
Ele só vai começar a parallelStream em uma nova linha e, em seguida, tudo o resto é apenas o mesmo, o que mais uma vez prova que o
parallelStream
usará o ForkJoinPool para começar seus tópicos criança.fonte
Vá para obter o AbacusUtil . O número do encadeamento pode ser especificado para o fluxo paralelo. Aqui está o código de exemplo:
Divulgação : Sou desenvolvedor do AbacusUtil.
fonte
Se você não deseja confiar em hacks de implementação, sempre há uma maneira de conseguir o mesmo, implementando coletores personalizados que combinarão
map
ecollect
semântica ... e você não estaria limitado ao ForkJoinPool:Felizmente, isso já foi feito aqui e está disponível no Maven Central: http://github.com/pivovarit/parallel-collectors
Disclaimer: Eu escrevi e assumo a responsabilidade.
fonte
Se você não se importa em usar uma biblioteca de terceiros, com o cyclops-react, você pode misturar Streams sequenciais e paralelos no mesmo pipeline e fornecer ForkJoinPools personalizados. Por exemplo
Ou se desejássemos continuar processando dentro de um fluxo sequencial
[Divulgação Sou o principal desenvolvedor do cyclops-react]
fonte
Se você não precisa de um ThreadPool personalizado, mas deseja limitar o número de tarefas simultâneas, pode usar:
(A pergunta duplicada solicitando isso está bloqueada, por favor, me carregue aqui)
fonte
você pode tentar implementar esse ForkJoinWorkerThreadFactory e injetar na classe Fork-Join.
você pode usar esse construtor de pool de junção de garfo para fazer isso.
notas: - 1. se você usar isso, leve em consideração que, com base na implementação de novos encadeamentos, o planejamento da JVM será afetado, que geralmente agenda encadeamentos de junção de forquilha para núcleos diferentes (tratados como um encadeamento computacional). 2. O agendamento de tarefas por junção forçada a threads não será afetado. 3. Ainda não descobri como o fluxo paralelo seleciona os segmentos da junção de forquilha (não foi possível encontrar a documentação adequada), então tente usar uma fábrica threadNaming diferente para garantir que os segmentos no fluxo paralelo estejam sendo selecionados de customThreadFactory que você fornece. 4. commonThreadPool não usará este customThreadFactory.
fonte