Quais são os benefícios de usar a nova estrutura de junção / junção, simplesmente dividindo a grande tarefa em subtarefas N no início, enviando-as para um pool de encadeamentos em cache (dos Executores ) e aguardando a conclusão de cada tarefa? Não vejo como o uso da abstração fork / join simplifica o problema ou torna a solução mais eficiente do que tivemos há anos.
Por exemplo, o algoritmo de desfoque paralelo no exemplo do tutorial pode ser implementado assim:
public class Blur implements Runnable {
private int[] mSource;
private int mStart;
private int mLength;
private int[] mDestination;
private int mBlurWidth = 15; // Processing window size, should be odd.
public ForkBlur(int[] src, int start, int length, int[] dst) {
mSource = src;
mStart = start;
mLength = length;
mDestination = dst;
}
public void run() {
computeDirectly();
}
protected void computeDirectly() {
// As in the example, omitted for brevity
}
}
Divida no início e envie tarefas para um conjunto de encadeamentos:
// source image pixels are in src
// destination image pixels are in dst
// threadPool is a (cached) thread pool
int maxSize = 100000; // analogous to F-J's "sThreshold"
List<Future> futures = new ArrayList<Future>();
// Send stuff to thread pool:
for (int i = 0; i < src.length; i+= maxSize) {
int size = Math.min(maxSize, src.length - i);
ForkBlur task = new ForkBlur(src, i, size, dst);
Future f = threadPool.submit(task);
futures.add(f);
}
// Wait for all sent tasks to complete:
for (Future future : futures) {
future.get();
}
// Done!
As tarefas vão para a fila do conjunto de encadeamentos, a partir da qual são executadas quando os encadeamentos de trabalho se tornam disponíveis. Desde que a divisão seja granular o suficiente (para evitar ter que esperar particularmente pela última tarefa) e o conjunto de encadeamentos possua encadeamentos suficientes (pelo menos N de processadores), todos os processadores estarão trabalhando em velocidade máxima até que todo o cálculo seja feito.
Estou esquecendo de algo? Qual é o valor agregado do uso da estrutura fork / join?
Se você tiver n threads ocupados trabalhando com 100% de forma independente, será melhor que n threads em um pool Fork-Join (FJ). Mas isso nunca funciona dessa maneira.
Talvez não seja possível dividir com precisão o problema em n partes iguais. Mesmo se você fizer isso, o agendamento de encadeamentos é algo que não é justo. Você acabará esperando o thread mais lento. Se você tiver várias tarefas, cada uma delas poderá ser executada com paralelismo menor que o n-way (geralmente mais eficiente), e ainda assim até o n-way quando outras tarefas forem concluídas.
Então, por que não cortamos o problema em pedaços do tamanho de FJ e temos um pool de threads trabalhando nisso? O uso típico de FJ corta o problema em pequenos pedaços. Fazer isso em uma ordem aleatória requer muita coordenação no nível do hardware. As despesas gerais seriam um assassino. No FJ, as tarefas são colocadas em uma fila que o thread lê na ordem Last In First Out (LIFO / pilha), e o roubo de trabalho (no trabalho principal, geralmente) é feito First In First Out (FIFO / "fila"). O resultado é que o processamento de matriz longa pode ser feito em grande parte sequencialmente, mesmo que seja dividido em pequenos pedaços. (Também é possível que não seja trivial dividir o problema em pequenos pedaços de tamanho uniforme em um big bang. Diga lidar com uma forma de hierarquia sem balanceamento).
Conclusão: O FJ permite o uso mais eficiente de threads de hardware em situações desiguais, o que sempre será se você tiver mais de um thread.
fonte
maxSize
parâmetro no meu exemplo produziria uma divisão de subtarefa quase semelhante à "divisão binária" no exemplo do FJ (feita dentro docompute()
método, que calcula algo ou envia subtarefas parainvokeAll()
).O objetivo final dos pools de threads e do Fork / Join é o mesmo: ambos desejam utilizar a energia da CPU disponível da melhor maneira possível para obter o máximo rendimento. O rendimento máximo significa que o maior número possível de tarefas deve ser concluído em um longo período de tempo. O que é necessário para fazer isso? (Para o seguinte, assumiremos que não há escassez de tarefas de cálculo: sempre há o suficiente para uma utilização de 100% da CPU. Além disso, eu uso "CPU" de forma equivalente para núcleos ou núcleos virtuais em caso de hiperencadeamento).
Assim, descobrimos que, para obter o rendimento máximo, precisamos ter exatamente o mesmo número de threads que as CPUs. No exemplo de desfoque do Oracle, você pode pegar um pool de threads de tamanho fixo com o número de threads igual ao número de CPUs disponíveis ou usar um pool de threads. Não fará diferença, você está certo!
Então, quando você terá problemas com um pool de threads? Ou seja, se um encadeamento for bloqueado , porque o encadeamento aguarda a conclusão de outra tarefa. Suponha o seguinte exemplo:
O que vemos aqui é um algoritmo que consiste em três etapas A, B e C. A e B podem ser executadas independentemente uma da outra, mas a etapa C precisa do resultado da etapa A e B. O que esse algoritmo faz é enviar a tarefa A para o conjunto de threads e execute a tarefa b diretamente. Depois disso, o encadeamento aguardará a conclusão da tarefa A e continuará com a etapa C. Se A e B forem concluídos ao mesmo tempo, tudo estará bem. Mas e se A demorar mais que B? Isso pode ser porque a natureza da tarefa A determina, mas também pode ser o caso, porque não há encadeamento para a tarefa A disponível no início e a tarefa A precisa esperar. (Se houver apenas uma única CPU disponível e, portanto, o seu conjunto de encadeamentos tiver apenas um único encadeamento, isso poderá causar um impasse, mas por enquanto isso está além do ponto). O ponto é que o thread que acabou de executar a tarefa Bbloqueia o segmento inteiro . Como temos o mesmo número de threads que as CPUs e um thread está bloqueado, isso significa que uma CPU está ociosa .
O fork / join resolve este problema: Na estrutura do fork / join, você escreveria o mesmo algoritmo da seguinte maneira:
Parece o mesmo, não é? No entanto, a pista é que
aTask.join
não irá bloquear . Em vez disso, é aqui que o roubo de trabalho entra em ação : o thread procurará outras tarefas que foram bifurcadas no passado e continuará com elas. Primeiro, ele verifica se as tarefas que se bifurcaram começaram o processamento. Portanto, se A ainda não tiver sido iniciado por outro encadeamento, ele fará A a seguir, caso contrário, verificará a fila de outros encadeamentos e roubará seu trabalho. Depois que essa outra tarefa de outro encadeamento for concluída, ele verificará se A está concluído agora. Se for o algoritmo acima, pode chamarstepC
. Caso contrário, ele procurará mais uma tarefa a ser roubada. Assim, os pools de junção / forquilha podem atingir 100% de utilização da CPU, mesmo diante de ações de bloqueio .No entanto, existe uma armadilha: o roubo de trabalho só é possível para a
join
chamada deForkJoinTask
s. Isso não pode ser feito para ações de bloqueio externas, como aguardar outro encadeamento ou aguardar uma ação de E / S. Então, o que é isso, esperar a conclusão da E / S é uma tarefa comum? Nesse caso, se pudermos adicionar um encadeamento adicional ao pool de Bifurcação / Junção que será interrompido novamente assim que a ação de bloqueio for concluída, será a segunda melhor coisa a fazer. E oForkJoinPool
pode realmente fazer exatamente isso se estivermos usandoManagedBlocker
s.Fibonacci
No JavaDoc for RecursiveTask, há um exemplo para calcular números de Fibonacci usando Fork / Join. Para uma solução recursiva clássica, consulte:
Como é explicado nos JavaDocs, essa é uma maneira bastante simples de calcular números de fibonacci, pois esse algoritmo tem complexidade O (2 ^ n), enquanto maneiras mais simples são possíveis. No entanto, este algoritmo é muito simples e fácil de entender, por isso o mantemos. Vamos supor que queremos acelerar isso com o Fork / Join. Uma implementação ingênua ficaria assim:
As etapas em que esta tarefa está dividida são muito curtas e, portanto, terão um desempenho horrível, mas você pode ver como a estrutura geralmente funciona muito bem: As duas ordens de soma podem ser calculadas independentemente, mas precisamos das duas para criar a versão final. resultado. Então metade é feita em outro segmento. Divirta-se fazendo o mesmo com conjuntos de encadeamentos sem obter um impasse (possível, mas não tão simples).
Apenas para completar: se você realmente deseja calcular os números de Fibonacci usando esta abordagem recursiva, aqui está uma versão otimizada:
Isso mantém as subtarefas muito menores, porque elas são divididas apenas quando
n > 10 && getSurplusQueuedTaskCount() < 2
verdadeiras, o que significa que há significativamente mais de 100 chamadas de método para fazer (n > 10
) e não há muitas tarefas manuais aguardando (getSurplusQueuedTaskCount() < 2
).No meu computador (4 núcleos (8 ao contar o Hyper-threading), a CPU Intel (R) Core i7-2720QM a 2.20 GHz)
fib(50)
leva 64 segundos com a abordagem clássica e apenas 18 segundos com a abordagem Fork / Join, que é um ganho bastante perceptível, embora não tanto quanto teoricamente possível.Resumo
fonte
Forquilha / junção é diferente de um conjunto de encadeamentos porque implementa o roubo de trabalho. De Fork / Join
Digamos que você tenha dois threads e 4 tarefas a, b, c, d que levam 1, 1, 5 e 6 segundos, respectivamente. Inicialmente, aeb são atribuídos ao encadeamento 1 e c e d ao encadeamento 2. Em um conjunto de encadeamentos, isso levaria 11 segundos. Com fork / join, o segmento 1 termina e pode roubar o trabalho do segmento 2, portanto, a tarefa d acabaria sendo executada pelo segmento 1. O segmento 1 executa a, b e d, o segmento 2 apenas c. Tempo total: 8 segundos, não 11.
EDIT: Como Joonas aponta, as tarefas não são necessariamente pré-alocadas a um thread. A idéia da junção / junção é que um encadeamento pode optar por dividir uma tarefa em várias sub-partes. Então, para reafirmar o acima:
Temos duas tarefas (ab) e (cd) que levam 2 e 11 segundos, respectivamente. O segmento 1 começa a executar ab e o divide em duas subtarefas a & b. Da mesma forma com o encadeamento 2, ele se divide em duas subtarefas c & d. Quando a linha 1 termina a & b, ela pode roubar d da linha 2.
fonte
compute()
calcula a tarefa ou a divide em duas subtarefas. A opção escolhida depende apenas do tamanho da tarefa (if (mLength < sThreshold)...
), portanto, é apenas uma maneira elegante de criar um número fixo de tarefas. Para uma imagem de 1000 x 1000, haverá exatamente 16 subtarefas que realmente computam algo. Além disso, haverá 15 (= 16 - 1) tarefas "intermediárias" que apenas geram e invocam subtarefas e não calculam nada.computeDirectly()
método, não há mais como roubar nada. Toda a divisão é feita a priori , pelo menos no exemplo.Todos os que estão acima estão corretos, os benefícios são alcançados pelo trabalho roubado, mas para expandir o porquê disso.
O principal benefício é a coordenação eficiente entre os threads de trabalho. O trabalho deve ser dividido e remontado, o que requer coordenação. Como você pode ver na resposta da AH acima, cada thread tem sua própria lista de trabalho. Uma propriedade importante desta lista é que ela é classificada (grandes tarefas na parte superior e pequenas tarefas na parte inferior). Cada thread executa as tarefas na parte inferior de sua lista e rouba tarefas da parte superior de outras listas de threads.
O resultado disso é:
A maioria dos outros esquemas de divisão e conquista usando conjuntos de encadeamentos exige mais comunicação e coordenação entre encadeamentos.
fonte
Neste exemplo, Bifurcação / Junção não agrega valor, porque a bifurcação não é necessária e a carga de trabalho é dividida igualmente entre os segmentos de trabalho. Forquilha / junção adiciona apenas sobrecarga.
Aqui está um bom artigo sobre o assunto. Citar:
fonte
Outra diferença importante parece ser que, com o FJ, você pode executar várias fases complexas de "junção". Considere a classificação de mesclagem em http://faculty.ycp.edu/~dhovemey/spring2011/cs365/lecture/lecture18.html , haveria muita orquestração necessária para pré-dividir este trabalho. Por exemplo, você precisa fazer o seguinte:
Como você especifica que deve fazer as classificações antes das mesclagens que lhes dizem respeito, etc.
Eu estive procurando a melhor maneira de fazer uma determinada coisa para cada uma das listas de itens. Acho que vou pré-dividir a lista e usar um ThreadPool padrão. FJ parece mais útil quando o trabalho não pode ser pré-dividido em tarefas independentes suficientes, mas pode ser recursivamente dividido em tarefas que são independentes entre si (por exemplo, classificar as metades são independentes, mas mesclar as duas metades classificadas em um todo classificado não é).
fonte
O F / J também possui uma vantagem distinta quando você possui operações caras de mesclagem. Como ele se divide em uma estrutura em árvore, você apenas mescla log2 (n) em oposição a n se mescla à divisão linear de threads. (Isso assume o pressuposto teórico de que você tem tantos processadores quanto threads, mas ainda uma vantagem) Para uma tarefa de casa, tivemos que mesclar vários milhares de matrizes 2D (todas as mesmas dimensões) somando os valores em cada índice. Com junção de forquilha e processadores P, o tempo se aproxima de log2 (n), enquanto P se aproxima do infinito.
1 2 3 .. 7 3 1 .... 8 5 4
4 5 6 + 2 4 3 => 6 9 9
7 8 9 .. 1 1 0 .... 8 9 9
fonte
Você ficaria surpreso com o desempenho do ForkJoin em aplicativos como rastreador. Aqui está o melhor tutorial com o qual você aprenderia.
fonte
Se o problema é tal que precisamos esperar que outros threads sejam concluídos (como no caso de classificação da matriz ou soma da matriz), a junção de bifurcação deve ser usada, pois o Executor (Executors.newFixedThreadPool (2)) engasgará devido a limitações número de processos. O conjunto de junções de garfo criará mais encadeamentos nesse caso para encobrir o encadeamento bloqueado para manter o mesmo paralelismo
Fonte: http://www.oracle.com/technetwork/articles/java/fork-join-422606.html
O problema com os executores para implementar algoritmos de divisão e conquista não está relacionado à criação de subtarefas, porque um Callable é livre para enviar uma nova subtarefa ao executor e aguardar o resultado de maneira síncrona ou assíncrona. O problema é o paralelismo: quando um Callable aguarda o resultado de outro Callable, ele é colocado em um estado de espera, desperdiçando a oportunidade de lidar com outro Callable na fila para execução.
A estrutura fork / join adicionada ao pacote java.util.concurrent no Java SE 7 através dos esforços de Doug Lea preenche essa lacuna
Fonte: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinPool.html
O pool tenta manter encadeamentos ativos (ou disponíveis) suficientes, adicionando, suspendendo ou retomando dinamicamente encadeamentos internos do trabalhador, mesmo que algumas tarefas estejam paralisadas aguardando a junção de outras. No entanto, esses ajustes não são garantidos diante de E / S bloqueadas ou outras sincronizações não gerenciadas
public int getPoolSize () Retorna o número de threads de trabalho que foram iniciados, mas ainda não terminados. O resultado retornado por esse método pode diferir de getParallelism () quando threads são criados para manter o paralelismo quando outros são bloqueados cooperativamente.
fonte
Gostaria de adicionar uma resposta curta para aqueles que não têm muito tempo para ler respostas longas. A comparação é feita no livro Applied Akka Patterns:
fonte