Java ThreadPoolExecutor: A atualização do tamanho do pool principal rejeita dinamicamente as tarefas recebidas intermitentemente

13

Estou enfrentando um problema em que, se eu tentar redimensionar o ThreadPoolExecutortamanho de um pool principal para um número diferente após a criação do pool, então, de forma intermitente, algumas tarefas serão rejeitadas, RejectedExecutionExceptionmesmo que eu nunca envie mais do que um queueSize + maxPoolSizenúmero de tarefas.

O problema que estou tentando resolver é estender ThreadPoolExecutoro redimensionamento de seus threads principais com base nas execuções pendentes que estão na fila do pool de threads. Eu preciso disso porque, por padrão, um ThreadPoolExecutorcriará um novo Threadapenas se a fila estiver cheia.

Aqui está um pequeno programa Pure Java 8 independente que demonstra o problema.

import static java.lang.Math.max;
import static java.lang.Math.min;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolResizeTest {

    public static void main(String[] args) throws Exception {
        // increase the number of iterations if unable to reproduce
        // for me 100 iterations have been enough
        int numberOfExecutions = 100;

        for (int i = 1; i <= numberOfExecutions; i++) {
            executeOnce();
        }
    }

    private static void executeOnce() throws Exception {
        int minThreads = 1;
        int maxThreads = 5;
        int queueCapacity = 10;

        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                minThreads, maxThreads,
                0, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(queueCapacity),
                new ThreadPoolExecutor.AbortPolicy()
        );

        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
        scheduler.scheduleAtFixedRate(() -> resizeThreadPool(pool, minThreads, maxThreads),
                0, 10, TimeUnit.MILLISECONDS);
        CompletableFuture<Void> taskBlocker = new CompletableFuture<>();

        try {
            int totalTasksToSubmit = queueCapacity + maxThreads;

            for (int i = 1; i <= totalTasksToSubmit; i++) {
                // following line sometimes throws a RejectedExecutionException
                pool.submit(() -> {
                    // block the thread and prevent it from completing the task
                    taskBlocker.join();
                });
                // Thread.sleep(10); //enabling even a small sleep makes the problem go away
            }
        } finally {
            taskBlocker.complete(null);
            scheduler.shutdown();
            pool.shutdown();
        }
    }

    /**
     * Resize the thread pool if the number of pending tasks are non-zero.
     */
    private static void resizeThreadPool(ThreadPoolExecutor pool, int minThreads, int maxThreads) {
        int pendingExecutions = pool.getQueue().size();
        int approximateRunningExecutions = pool.getActiveCount();

        /*
         * New core thread count should be the sum of pending and currently executing tasks
         * with an upper bound of maxThreads and a lower bound of minThreads.
         */
        int newThreadCount = min(maxThreads, max(minThreads, pendingExecutions + approximateRunningExecutions));

        pool.setCorePoolSize(newThreadCount);
        pool.prestartAllCoreThreads();
    }
}

Por que o pool deve lançar uma RejectedExecutionException se eu nunca enviar mais do que o queueCapacity + maxThreads. Como nunca altero o número máximo de threads, por definição do ThreadPoolExecutor, ele deve acomodar a tarefa em um Thread ou na fila.

Obviamente, se eu nunca redimensionar o pool, ele nunca rejeitará nenhum envio. Isso também é difícil de depurar, pois adicionar qualquer tipo de atraso nos envios faz com que o problema desapareça.

Alguma dica sobre como corrigir o RejectedExecutionException?

Swaranga Sarma
fonte
Por que não fornecer sua própria implementação ExecutorServiceenvolvendo uma já existente, que reenvia as tarefas que falharam no envio devido ao redimensionamento?
daniu 13/03
@daniu que é uma solução alternativa. O ponto das perguntas é por que o pool deve lançar uma RejectedExecutionException se eu nunca enviar mais do que o queueCapacity + maxThreads. Como nunca altero o número máximo de threads, por definição do ThreadPoolExecutor, ele deve acomodar a tarefa em um Thread ou na fila.
Swaranga Sarma 13/03
Ok, parece que eu entendi mal a sua pergunta. O que é isso? Você quer saber por que o comportamento ocorre ou como você o contorna, causando problemas para você?
daniu 13/03
Sim, alterar minha implementação para um serviço executor não é viável, pois grande parte do código se refere ao ThreadPoolExecutor. Portanto, se eu ainda queria ter um ThreadPoolExecutor redimensionável, preciso saber como corrigi-lo. A maneira correta de fazer algo assim é estender o ThreadPoolExecutor e obter acesso a algumas de suas variáveis ​​protegidas e atualizar o tamanho do pool dentro de um bloco sincronizado em um bloqueio compartilhado pela superclasse.
Swaranga Sarma 13/03
A extensão ThreadPoolExecutoré provavelmente uma péssima idéia, e você também não precisaria alterar o código existente? Seria melhor você fornecer um exemplo de como seu código real acessa o executor. Eu ficaria surpreso se ele usasse muitos métodos específicos para ThreadPoolExecutor(ou seja, não dentro ExecutorService).
daniu 13/03

Respostas:

5

Aqui está um cenário pelo qual isso está acontecendo:

No meu exemplo, uso minThreads = 0, maxThreads = 2 e queueCapacity = 2 para torná-lo mais curto. O primeiro comando é enviado, isso é feito no método execute:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

para este comando, workQueue.offer (command) than addWorker (null, false) é executado. O encadeamento do trabalhador primeiro remove esse comando da fila no método de execução do encadeamento, portanto, no momento, a fila ainda possui um comando,

O segundo comando é enviado dessa vez, workQueue.offer (command) é executado. Agora a fila está cheia

Agora o ScheduledExecutorService executa o método resizeThreadPool que chama setCorePoolSize com maxThreads. Aqui está o método setCorePoolSize:

 public void setCorePoolSize(int corePoolSize) {
    if (corePoolSize < 0)
        throw new IllegalArgumentException();
    int delta = corePoolSize - this.corePoolSize;
    this.corePoolSize = corePoolSize;
    if (workerCountOf(ctl.get()) > corePoolSize)
        interruptIdleWorkers();
    else if (delta > 0) {
        // We don't really know how many new threads are "needed".
        // As a heuristic, prestart enough new workers (up to new
        // core size) to handle the current number of tasks in
        // queue, but stop if queue becomes empty while doing so.
        int k = Math.min(delta, workQueue.size());
        while (k-- > 0 && addWorker(null, true)) {
            if (workQueue.isEmpty())
                break;
        }
    }
}

Este método adiciona um trabalhador usando addWorker (null, true). Não há duas filas de trabalhadores em execução, o máximo e a fila está cheia.

O terceiro comando é enviado e falha porque workQueue.offer (command) e addWorker (command, false) falham, levando à Exceção:

java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@24c22fe rejected from java.util.concurrent.ThreadPoolExecutor@cd1e646[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at ThreadPoolResizeTest.executeOnce(ThreadPoolResizeTest.java:60)
at ThreadPoolResizeTest.runTest(ThreadPoolResizeTest.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:69)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:48)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
at org.junit.runners.ParentRunner.run(ParentRunner.java:292)
at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)

Penso que para resolver este problema, você deve definir a capacidade da fila para o máximo de comandos que deseja executar.

Thomas Krieger
fonte
Corrigir. Consegui me reproduzir, copiando o código para minha própria classe e adicionando registradores. Basicamente, quando a fila está cheia e eu envio uma nova tarefa, ela tenta criar um novo Trabalhador. Enquanto isso, nesse momento, meu redimensionador também chama setCorePoolSize para 2, o que também cria um novo Worker. Neste ponto, dois trabalhadores estão competindo para serem adicionados, mas ambos não podem ser, porque isso violaria a restrição de tamanho máximo do pool, de modo que o envio da nova tarefa fosse rejeitado. Eu acho que essa é uma condição de corrida e enviei um relatório de bug ao OpenJDK. Vamos ver. Mas você respondeu minha pergunta para receber a recompensa. Obrigado.
Swaranga Sarma 20/03
2

Não tenho certeza se isso é qualificado como bug. Esse é o comportamento quando os threads de trabalho adicionais são criados após a fila estar cheia, mas isso foi observado nos documentos java que o chamador precisa lidar com as tarefas que estão sendo rejeitadas.

Documentos Java

Fábrica para novos segmentos. Todos os threads são criados usando esta fábrica (através do método addWorker). Todos os chamadores devem estar preparados para que o addWorker falhe, o que pode refletir uma política do sistema ou do usuário que limita o número de threads. Mesmo que não seja tratado como um erro, a falha na criação de encadeamentos pode resultar na rejeição de novas tarefas ou na permanência de tarefas existentes na fila.

Quando você redimensiona o tamanho do pool principal, digamos que aumente, os trabalhadores adicionais são criados ( addWorkermétodo in setCorePoolSize) e a chamada para criar trabalho adicional ( addWorkermétodo from execute) é rejeitada quando o addWorkerretorno false ( add Workerúltimo trecho de código) já que os trabalhadores adicionais suficientes já estão criado por, setCorePoolSize mas ainda não executado, para refletir a atualização na fila .

Peças relevantes

Comparar

public void setCorePoolSize(int corePoolSize) {
    ....
    int k = Math.min(delta, workQueue.size());
    while (k-- > 0 && addWorker(null, true)) {
        if (workQueue.isEmpty())
             break;
    }
}

public void execute(Runnable command) {
    ...
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

private boolean addWorker(Runnable firstTask, boolean core) {
....
   if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
     return false;             
}

Use o manipulador de execução de rejeição de nova tentativa personalizada (isso deve funcionar para o seu caso, pois você tem o limite superior como tamanho máximo do conjunto). Ajuste conforme necessário.

public static class RetryRejectionPolicy implements RejectedExecutionHandler {
    public RetryRejectionPolicy () {}

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
           while(true)
            if(e.getQueue().offer(r)) break;
        }
    }
}

ThreadPoolExecutor pool = new ThreadPoolExecutor(
      minThreads, maxThreads,
      0, TimeUnit.SECONDS,
      new LinkedBlockingQueue<Runnable>(queueCapacity),
      new ThreadPoolResizeTest.RetryRejectionPolicy()
 );

Observe também que o uso do encerramento não está correto, pois isso não aguardará a conclusão da tarefa enviada, mas o utilizará em awaitTerminationvez disso.

Sagar Veeram
fonte
Eu acho que o desligamento aguarda as tarefas já enviadas, de acordo com o JavaDoc: shutdown () Inicia um desligamento ordenado no qual as tarefas enviadas anteriormente são executadas, mas nenhuma nova tarefa será aceita.
Thomas Krieger
@ThomasKrieger - Ele executará as tarefas já enviadas, mas não esperará que elas terminem - em docs docs.oracle.com/javase/7/docs/api/java/util/concurrent/… - Este método não aguarda o envio prévio tarefas para concluir a execução. Use waititTermination para fazer isso.
Sagar Veeram 16/03