Estou enfrentando um problema em que, se eu tentar redimensionar o ThreadPoolExecutor
tamanho de um pool principal para um número diferente após a criação do pool, então, de forma intermitente, algumas tarefas serão rejeitadas, RejectedExecutionException
mesmo que eu nunca envie mais do que um queueSize + maxPoolSize
número de tarefas.
O problema que estou tentando resolver é estender ThreadPoolExecutor
o redimensionamento de seus threads principais com base nas execuções pendentes que estão na fila do pool de threads. Eu preciso disso porque, por padrão, um ThreadPoolExecutor
criará um novo Thread
apenas se a fila estiver cheia.
Aqui está um pequeno programa Pure Java 8 independente que demonstra o problema.
import static java.lang.Math.max;
import static java.lang.Math.min;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolResizeTest {
public static void main(String[] args) throws Exception {
// increase the number of iterations if unable to reproduce
// for me 100 iterations have been enough
int numberOfExecutions = 100;
for (int i = 1; i <= numberOfExecutions; i++) {
executeOnce();
}
}
private static void executeOnce() throws Exception {
int minThreads = 1;
int maxThreads = 5;
int queueCapacity = 10;
ThreadPoolExecutor pool = new ThreadPoolExecutor(
minThreads, maxThreads,
0, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(queueCapacity),
new ThreadPoolExecutor.AbortPolicy()
);
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(() -> resizeThreadPool(pool, minThreads, maxThreads),
0, 10, TimeUnit.MILLISECONDS);
CompletableFuture<Void> taskBlocker = new CompletableFuture<>();
try {
int totalTasksToSubmit = queueCapacity + maxThreads;
for (int i = 1; i <= totalTasksToSubmit; i++) {
// following line sometimes throws a RejectedExecutionException
pool.submit(() -> {
// block the thread and prevent it from completing the task
taskBlocker.join();
});
// Thread.sleep(10); //enabling even a small sleep makes the problem go away
}
} finally {
taskBlocker.complete(null);
scheduler.shutdown();
pool.shutdown();
}
}
/**
* Resize the thread pool if the number of pending tasks are non-zero.
*/
private static void resizeThreadPool(ThreadPoolExecutor pool, int minThreads, int maxThreads) {
int pendingExecutions = pool.getQueue().size();
int approximateRunningExecutions = pool.getActiveCount();
/*
* New core thread count should be the sum of pending and currently executing tasks
* with an upper bound of maxThreads and a lower bound of minThreads.
*/
int newThreadCount = min(maxThreads, max(minThreads, pendingExecutions + approximateRunningExecutions));
pool.setCorePoolSize(newThreadCount);
pool.prestartAllCoreThreads();
}
}
Por que o pool deve lançar uma RejectedExecutionException se eu nunca enviar mais do que o queueCapacity + maxThreads. Como nunca altero o número máximo de threads, por definição do ThreadPoolExecutor, ele deve acomodar a tarefa em um Thread ou na fila.
Obviamente, se eu nunca redimensionar o pool, ele nunca rejeitará nenhum envio. Isso também é difícil de depurar, pois adicionar qualquer tipo de atraso nos envios faz com que o problema desapareça.
Alguma dica sobre como corrigir o RejectedExecutionException?
fonte
ExecutorService
envolvendo uma já existente, que reenvia as tarefas que falharam no envio devido ao redimensionamento?ThreadPoolExecutor
é provavelmente uma péssima idéia, e você também não precisaria alterar o código existente? Seria melhor você fornecer um exemplo de como seu código real acessa o executor. Eu ficaria surpreso se ele usasse muitos métodos específicos paraThreadPoolExecutor
(ou seja, não dentroExecutorService
).Respostas:
Aqui está um cenário pelo qual isso está acontecendo:
No meu exemplo, uso minThreads = 0, maxThreads = 2 e queueCapacity = 2 para torná-lo mais curto. O primeiro comando é enviado, isso é feito no método execute:
para este comando, workQueue.offer (command) than addWorker (null, false) é executado. O encadeamento do trabalhador primeiro remove esse comando da fila no método de execução do encadeamento, portanto, no momento, a fila ainda possui um comando,
O segundo comando é enviado dessa vez, workQueue.offer (command) é executado. Agora a fila está cheia
Agora o ScheduledExecutorService executa o método resizeThreadPool que chama setCorePoolSize com maxThreads. Aqui está o método setCorePoolSize:
Este método adiciona um trabalhador usando addWorker (null, true). Não há duas filas de trabalhadores em execução, o máximo e a fila está cheia.
O terceiro comando é enviado e falha porque workQueue.offer (command) e addWorker (command, false) falham, levando à Exceção:
Penso que para resolver este problema, você deve definir a capacidade da fila para o máximo de comandos que deseja executar.
fonte
Não tenho certeza se isso é qualificado como bug. Esse é o comportamento quando os threads de trabalho adicionais são criados após a fila estar cheia, mas isso foi observado nos documentos java que o chamador precisa lidar com as tarefas que estão sendo rejeitadas.
Documentos Java
Quando você redimensiona o tamanho do pool principal, digamos que aumente, os trabalhadores adicionais são criados (
addWorker
método insetCorePoolSize
) e a chamada para criar trabalho adicional (addWorker
método fromexecute
) é rejeitada quando oaddWorker
retorno false (add Worker
último trecho de código) já que os trabalhadores adicionais suficientes já estão criado por,setCorePoolSize
mas ainda não executado, para refletir a atualização na fila .Peças relevantes
Comparar
Use o manipulador de execução de rejeição de nova tentativa personalizada (isso deve funcionar para o seu caso, pois você tem o limite superior como tamanho máximo do conjunto). Ajuste conforme necessário.
Observe também que o uso do encerramento não está correto, pois isso não aguardará a conclusão da tarefa enviada, mas o utilizará em
awaitTermination
vez disso.fonte