Como fazer com que o ThreadPoolExecutor aumente os threads ao máximo antes de enfileirar?

99

Já faz algum tempo que estou frustrado com o comportamento padrão ThreadPoolExecutorque sustenta os ExecutorServicepools de threads que tantos de nós usamos. Para citar os Javadocs:

Se houver mais de corePoolSize, mas menos de threads maximumPoolSize em execução, um novo thread será criado apenas se a fila estiver cheia .

O que isso significa é que se você definir um pool de threads com o código a seguir, ele nunca iniciará o segundo thread porque o LinkedBlockingQueueé ilimitado.

ExecutorService threadPool =
   new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
      TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));

Somente se você tiver uma fila limitada e a fila estiver cheia, quaisquer threads acima do número do núcleo serão iniciados. Suspeito que um grande número de programadores Java multithreaded júnior não estão cientes desse comportamento do ThreadPoolExecutor.

Agora eu tenho um caso de uso específico em que isso não é o ideal. Estou procurando maneiras, sem escrever minha própria classe TPE, de contornar isso.

Meus requisitos são para um serviço da web que está retornando a chamada para um terceiro possivelmente não confiável.

  • Não quero fazer o retorno de chamada de forma síncrona com a solicitação da web, então quero usar um pool de threads.
  • Normalmente recebo alguns desses por minuto, então não quero ter um newFixedThreadPool(...)com um grande número de threads que, em sua maioria, estão inativos.
  • De vez em quando, recebo uma explosão desse tráfego e desejo aumentar o número de threads para algum valor máximo (digamos 50).
  • Preciso fazer o melhor possível para fazer todos os retornos de chamada, então quero enfileirar qualquer um adicional acima de 50. Não quero sobrecarregar o resto do meu servidor da web usando um newCachedThreadPool().

Como posso contornar essa limitação de ThreadPoolExecutoronde a fila precisa ser delimitada e cheia antes que mais threads sejam iniciados? Como posso fazer com que ele inicie mais threads antes de colocar as tarefas na fila?

Editar:

@Flavio faz uma boa observação sobre o uso de ThreadPoolExecutor.allowCoreThreadTimeOut(true)para que os threads principais expirem e saiam. Eu considerei isso, mas ainda queria o recurso core-threads. Eu não queria que o número de threads no pool caísse abaixo do tamanho do núcleo, se possível.

cinzento
fonte
1
Dado que seu exemplo cria um máximo de 10 threads, há alguma economia real em usar algo que aumenta / diminui em um pool de threads de tamanho fixo?
bstempi
Boa observação @bstempi. O número era um tanto arbitrário. Eu aumentei na pergunta para 50. Não tenho certeza de quantos threads simultâneos eu realmente quero trabalhar agora que tenho essa solução.
Gray,
1
Oh droga! 10 votos positivos se pudesse aqui, exatamente a mesma posição em que estou.
Eugene

Respostas:

50

Como posso contornar essa limitação de ThreadPoolExecutoronde a fila precisa ser limitada e cheia antes que mais threads sejam iniciados.

Acredito que finalmente encontrei uma solução um tanto elegante (talvez um pouco hackeada) para essa limitação com ThreadPoolExecutor. Trata-se LinkedBlockingQueuede estender para que ele retorne falsepara queue.offer(...)quando já houver algumas tarefas na fila. Se os threads atuais não estiverem acompanhando as tarefas enfileiradas, o TPE adicionará threads adicionais. Se o pool já estiver no máximo de threads, o RejectedExecutionHandlerserá chamado. É o manipulador que então faz o put(...)na fila.

Certamente é estranho escrever uma fila onde offer(...)pode retornar falsee put()nunca bloquear, então essa é a parte do hack. Mas isso funciona bem com o uso da fila pelo TPE, então não vejo nenhum problema em fazer isso.

Aqui está o código:

// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    private static final long serialVersionUID = -6903933921423432194L;
    @Override
    public boolean offer(Runnable e) {
        // Offer it to the queue if there is 0 items already queued, else
        // return false so the TPE will add another thread. If we return false
        // and max threads have been reached then the RejectedExecutionHandler
        // will be called which will do the put into the queue.
        if (size() == 0) {
            return super.offer(e);
        } else {
            return false;
        }
    }
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
        60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            // This does the actual put into the queue. Once the max threads
            //  have been reached, the tasks will then queue up.
            executor.getQueue().put(r);
            // we do this after the put() to stop race conditions
            if (executor.isShutdown()) {
                throw new RejectedExecutionException(
                    "Task " + r + " rejected from " + e);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
    }
});

Com esse mecanismo, quando eu enviar tarefas para a fila, a ThreadPoolExecutorvontade:

  1. Dimensione o número de threads até o tamanho do núcleo inicialmente (aqui 1).
  2. Ofereça para a fila. Se a fila estiver vazia, ela será enfileirada para ser tratada pelos threads existentes.
  3. Se a fila já tiver 1 ou mais elementos, o offer(...)retornará falso.
  4. Se false for retornado, aumente o número de threads no pool até que atinjam o número máximo (aqui 50).
  5. Se estiver no máximo, ele chama o RejectedExecutionHandler
  6. Em RejectedExecutionHandlerseguida, coloca a tarefa na fila para ser processada pelo primeiro thread disponível na ordem FIFO.

Embora no meu código de exemplo acima, a fila seja ilimitada, você também pode defini-la como uma fila limitada. Por exemplo, se você adicionar uma capacidade de 1000 ao LinkedBlockingQueue, ele irá:

  1. dimensionar os fios até o máximo
  2. então enfileire até que esteja cheio com 1000 tarefas
  3. em seguida, bloqueie o chamador até que haja espaço disponível para a fila.

Além disso, se você precisar usar offer(...)no RejectedExecutionHandler, poderá usar o offer(E, long, TimeUnit)método em vez de Long.MAX_VALUEcomo o tempo limite.

Aviso:

Se você espera que as tarefas sejam adicionadas ao executor após ele ter sido encerrado, você pode querer ser mais esperto quanto a RejectedExecutionExceptiondescartar nosso serviço personalizado RejectedExecutionHandlerquando o executor-serviço for encerrado. Obrigado a @RaduToader por apontar isso.

Editar:

Outro ajuste para essa resposta poderia ser perguntar ao TPE se há threads inativos e apenas enfileirar o item se houver. Você teria que fazer uma verdadeira classe para isso e adicionar um ourQueue.setThreadPoolExecutor(tpe);método nela.

Então, seu offer(...)método pode ser semelhante a:

  1. Verifique se tpe.getPoolSize() == tpe.getMaximumPoolSize()nesse caso basta ligar super.offer(...).
  2. Caso contrário tpe.getPoolSize() > tpe.getActiveCount(), chame, super.offer(...)pois parece haver threads inativos.
  3. Caso contrário, volte falsepara bifurcar outro tópico.

Talvez isto:

int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
    return super.offer(e);
} else {
    return false;
}

Observe que os métodos get no TPE são caros, pois acessam volatilecampos ou (no caso de getActiveCount()) bloqueiam o TPE e percorrem a lista de threads. Além disso, existem condições de corrida aqui que podem fazer com que uma tarefa seja enfileirada incorretamente ou outro encadeamento bifurcado quando havia um encadeamento ocioso.

cinzento
fonte
Eu também lutei com o mesmo problema, acabei substituindo o método execute. Mas esta é realmente uma boa solução. :)
Batty
Por mais que eu não goste da ideia de quebrar o contrato Queuepara conseguir isso, você certamente não está sozinho em sua ideia: groovy-programming.com/post/26923146865
bstempi
3
Você não acha estranho aqui que as primeiras tarefas serão enfileiradas, e somente depois que novos threads surgirem? Por exemplo, se o seu thread principal estiver ocupado com uma única tarefa de longa duração e você ligar execute(runnable), ele runnableserá adicionado à fila. Se você ligar execute(secondRunnable), ele secondRunnableserá adicionado à fila. Mas agora, se você chamar execute(thirdRunnable), thirdRunnableserá executado em um novo tópico. O runnablee secondRunnableexecuta apenas uma vez thirdRunnable(ou a tarefa original de longa duração) são concluídos.
Robert Tupelo-Schneck
1
Sim, Robert está certo, em um ambiente altamente multithread, a fila às vezes aumenta enquanto há threads livres para usar. A solução abaixo que estende o TPE - funciona muito melhor. Acho que a sugestão de Robert deve ser marcada como resposta, embora o hack acima seja interessante
Quero Saber Tudo
1
O "RejectedExecutionHandler" ajudou o executor no desligamento. Agora você está sendo forçado a usar shutdownNow () porque shutdown () não impede que novas tarefas sejam adicionadas (por causa do reque)
Radu Toader
28

Defina o tamanho do núcleo e o tamanho máximo com o mesmo valor e permita que os encadeamentos principais sejam removidos do conjunto com allowCoreThreadTimeOut(true).

Flavio
fonte
+1 Sim, eu pensei nisso, mas ainda queria ter o recurso core-threads. Eu não queria que o pool de threads fosse para 0 threads durante os períodos inativos. Vou editar minha pergunta para apontar isso. Mas excelente ponto.
Grey
Obrigado! É a maneira mais simples de fazer isso.
Dmitry Ovchinnikov
28

Já tenho duas outras respostas para essa pergunta, mas suspeito que esta seja a melhor.

Baseia-se na técnica da resposta atualmente aceita , a saber:

  1. Substitua o offer()método da fila para (às vezes) retornar falso,
  2. que faz ThreadPoolExecutorcom que o gere um novo thread ou rejeite a tarefa, e
  3. defina o RejectedExecutionHandlerpara realmente enfileirar a tarefa na rejeição.

O problema é quando offer()deve retornar falso. A resposta aceita atualmente retorna falso quando a fila contém algumas tarefas, mas, como indiquei em meu comentário, isso causa efeitos indesejáveis. Como alternativa, se você sempre retornar false, continuará gerando novos threads, mesmo quando houver threads esperando na fila.

A solução é usar Java 7 LinkedTransferQueuee fazer offer()call tryTransfer(). Quando há um segmento de consumidor em espera, a tarefa será simplesmente passada para esse segmento. Caso contrário, offer()retornará falso e ThreadPoolExecutorgerará um novo tópico.

    BlockingQueue<Runnable> queue = new LinkedTransferQueue<Runnable>() {
        @Override
        public boolean offer(Runnable e) {
            return tryTransfer(e);
        }
    };
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue);
    threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    });
Robert Tupelo-Schneck
fonte
Eu tenho que concordar, isso parece mais limpo para mim. A única desvantagem da solução é que LinkedTransferQueue é ilimitado para que você não obtenha uma fila de tarefas limitada por capacidade sem trabalho extra.
Yeroc
Há um problema quando a piscina atinge o tamanho máximo. Digamos que pool foi ampliado para o tamanho máximo e cada thread está atualmente executando uma tarefa, quando o runnable for enviado, esta oferta impl retornará false e ThreadPoolExecutor tenta adicionar o thread de Worker, mas o pool já atingiu seu máximo, então o runnable será simplesmente rejeitado. De acordo com o rejeitadoExceHandler que você escreveu, ele será colocado na fila novamente, resultando nessa dança do macaco novamente desde o início.
Sudheera 01 de
1
@Sudheera Acho que você está enganado. queue.offer(), porque está realmente chamando LinkedTransferQueue.tryTransfer(), retornará falso e não enfileirará a tarefa. Porém as RejectedExecutionHandlerchamadas queue.put(), que não falham e enfileiram a tarefa.
Robert Tupelo-Schneck,
1
@RobertTupelo-Schneck extremamente útil e agradável!
Eugene
1
@RobertTupelo-Schneck Funciona como um encanto! Não sei por que não há algo assim fora da caixa no java
Georgi Peev
7

Nota: agora prefiro e recomendo minha outra resposta .

Aqui está uma versão que me parece muito mais direta: Aumente o corePoolSize (até o limite de maximumPoolSize) sempre que uma nova tarefa for executada e, em seguida, diminua o corePoolSize (até o limite do "tamanho do pool principal" especificado pelo usuário) sempre que um tarefa concluída.

Em outras palavras, controle o número de tarefas em execução ou enfileiradas e certifique-se de que corePoolSize seja igual ao número de tarefas, contanto que esteja entre o "tamanho do pool principal" especificado pelo usuário e o maximumPoolSize.

public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor {
    private int userSpecifiedCorePoolSize;
    private int taskCount;

    public GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        userSpecifiedCorePoolSize = corePoolSize;
    }

    @Override
    public void execute(Runnable runnable) {
        synchronized (this) {
            taskCount++;
            setCorePoolSizeToTaskCountWithinBounds();
        }
        super.execute(runnable);
    }

    @Override
    protected void afterExecute(Runnable runnable, Throwable throwable) {
        super.afterExecute(runnable, throwable);
        synchronized (this) {
            taskCount--;
            setCorePoolSizeToTaskCountWithinBounds();
        }
    }

    private void setCorePoolSizeToTaskCountWithinBounds() {
        int threads = taskCount;
        if (threads < userSpecifiedCorePoolSize) threads = userSpecifiedCorePoolSize;
        if (threads > getMaximumPoolSize()) threads = getMaximumPoolSize();
        setCorePoolSize(threads);
    }
}

Conforme escrito, a classe não oferece suporte à alteração do corePoolSize ou maximumPoolSize especificado pelo usuário após a construção e não oferece suporte à manipulação da fila de trabalho diretamente ou por meio de remove()ou purge().

Robert Tupelo-Schneck
fonte
Eu gosto, exceto pelos synchronizedblocos. Você pode ligar para a fila para obter o número de tarefas. Ou talvez use um AtomicInteger?
Cinza
Eu queria evitá-los, mas o problema é esse. Se houver várias execute()chamadas em threads separados, cada uma irá (1) descobrir quantos threads são necessários, (2) setCorePoolSizepara esse número e (3) chamar super.execute(). Se as etapas (1) e (2) não estiverem sincronizadas, não tenho certeza de como evitar uma ordem infeliz em que você define o tamanho do pool principal para um número menor após um número maior. Com acesso direto ao campo da superclasse, isso poderia ser feito usando compare-and-set, mas não vejo uma maneira limpa de fazer isso em uma subclasse sem sincronização.
Robert Tupelo-Schneck
Acho que as penalidades para essa condição de corrida são relativamente baixas, desde que o taskCountcampo seja válido (ou seja, a AtomicInteger). Se dois threads recalcularem o tamanho do pool imediatamente após o outro, ele deve obter os valores adequados. Se o segundo encolhe os threads principais, então deve ter visto uma queda na fila ou algo assim.
Grey
1
Infelizmente, acho que é pior do que isso. Suponha que as tarefas 10 e 11 sejam chamadas execute(). Cada um ligará atomicTaskCount.incrementAndGet()e receberá 10 e 11 respectivamente. Mas sem sincronização (além de obter a contagem de tarefas e definir o tamanho do pool principal), você poderia obter (1) a tarefa 11 define o tamanho do pool principal para 11, (2) a tarefa 10 define o tamanho do pool principal para 10, (3) a tarefa 10 chama super.execute(), (4) a tarefa 11 chama super.execute()e é enfileirada.
Robert Tupelo-Schneck
2
Eu testei seriamente essa solução e ela é claramente a melhor. Em um ambiente altamente multithread, às vezes ainda enfileirará quando houver threads livres (devido à natureza TPE.execute de thread livre), mas isso acontece raramente, ao contrário da solução marcada como resposta, onde a condição de corrida tem mais chances de acontecer, então isso acontece praticamente em cada execução multi-thread.
Quero Saber Tudo
6

Temos uma subclasse de ThreadPoolExecutorque leva um adicional creationThresholde substitui execute.

public void execute(Runnable command) {
    super.execute(command);
    final int poolSize = getPoolSize();
    if (poolSize < getMaximumPoolSize()) {
        if (getQueue().size() > creationThreshold) {
            synchronized (this) {
                setCorePoolSize(poolSize + 1);
                setCorePoolSize(poolSize);
            }
        }
    }
}

talvez isso ajude também, mas o seu parece mais artístico, é claro ...

Ralf H
fonte
Interessante. Obrigado por isso. Na verdade, eu não sabia que o tamanho do núcleo era mutável.
Gray
Agora que penso um pouco mais, esta solução é melhor do que a minha em termos de verificação do tamanho da fila. Eu ajustei minha resposta para que o offer(...)método só retorne falsecondicionalmente. Obrigado!
Gray
4

A resposta recomendada resolve apenas um (1) dos problemas com o pool de threads JDK:

  1. Os conjuntos de encadeamentos JDK são direcionados ao enfileiramento. Portanto, em vez de gerar um novo thread, eles enfileirarão a tarefa. Somente se a fila atingir seu limite, o pool de threads gerará uma nova thread.

  2. A retirada da linha não acontece quando a carga fica mais leve. Por exemplo, se tivermos uma explosão de jobs atingindo o pool que faz com que o pool chegue ao máximo, seguido por uma carga leve de no máximo 2 tarefas por vez, o pool usará todos os threads para atender à carga leve, evitando a retirada do thread. (apenas 2 tópicos seriam necessários ...)

Insatisfeito com o comportamento acima, fui em frente e implementei um pool para superar as deficiências acima.

Para resolver 2) Usar o agendamento do Lifo resolve o problema. Esta ideia foi apresentada por Ben Maurer na conferência de aplicativos ACM 2015: escala Systems @ Facebook

Então, uma nova implementação nasceu:

LifoThreadPoolExecutorSQP

Até agora, esta implementação melhora o desempenho de execução assíncrona para ZEL .

A implementação tem capacidade de rotação para reduzir a sobrecarga da troca de contexto, gerando desempenho superior para determinados casos de uso.

Espero que ajude...

PS: JDK Fork Join Pool implementa ExecutorService e funciona como um pool de threads "normal", a implementação tem um bom desempenho, usa o agendamento de threads LIFO, no entanto, não há controle sobre o tamanho da fila interna, tempo limite de desativação ... e, o mais importante, as tarefas não podem ser interrompido ao cancelá-los

user2179737
fonte
1
Uma pena que esta implementação tenha tantas dependências externas. Tornando-o inútil para mim: - /
Martin L.
1
É um ponto muito bom (2º). Infelizmente, a implementação não está clara nas dependências externas, mas ainda pode ser adotada se você quiser.
Alexey Vlasov
1

Nota: agora prefiro e recomendo minha outra resposta .

Tenho outra proposta, seguindo a ideia original de mudar a queue para retornar false. Neste todas as tarefas podem entrar na fila, mas sempre que uma tarefa é enfileirada depois execute(), nós a seguimos com uma tarefa sem operação sentinela que a fila rejeita, fazendo com que um novo thread seja gerado, que executará o no-op imediatamente seguido por algo da fila.

Como os threads de trabalho podem estar pesquisando o LinkedBlockingQueuepara uma nova tarefa, é possível que uma tarefa seja enfileirada mesmo quando há um thread disponível. Para evitar a geração de novos threads, mesmo quando há threads disponíveis, precisamos manter o controle de quantos threads estão esperando por novas tarefas na fila e apenas gerar um novo thread quando houver mais tarefas na fila do que threads em espera.

final Runnable SENTINEL_NO_OP = new Runnable() { public void run() { } };

final AtomicInteger waitingThreads = new AtomicInteger(0);

BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    @Override
    public boolean offer(Runnable e) {
        // offer returning false will cause the executor to spawn a new thread
        if (e == SENTINEL_NO_OP) return size() <= waitingThreads.get();
        else return super.offer(e);
    }

    @Override
    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
        try {
            waitingThreads.incrementAndGet();
            return super.poll(timeout, unit);
        } finally {
            waitingThreads.decrementAndGet();
        }
    }

    @Override
    public Runnable take() throws InterruptedException {
        try {
            waitingThreads.incrementAndGet();
            return super.take();
        } finally {
            waitingThreads.decrementAndGet();
        }
    }
};

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue) {
    @Override
    public void execute(Runnable command) {
        super.execute(command);
        if (getQueue().size() > waitingThreads.get()) super.execute(SENTINEL_NO_OP);
    }
};
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (r == SENTINEL_NO_OP) return;
        else throw new RejectedExecutionException();            
    }
});
Robert Tupelo-Schneck
fonte
0

A melhor solução que posso pensar é estender.

ThreadPoolExecutoroferece alguns métodos de gancho: beforeExecutee afterExecute. Em sua extensão, você pode manter o uso de uma fila limitada para alimentar as tarefas e uma segunda fila ilimitada para lidar com o estouro. Quando alguém liga submit, você pode tentar colocar a solicitação na fila limitada. Se você encontrar uma exceção, basta colocar a tarefa em sua fila de estouro. Você pode então utilizar o afterExecutegancho para ver se há algo na fila de estouro após terminar uma tarefa. Dessa forma, o executor cuidará das coisas em sua fila limitada primeiro e puxará automaticamente dessa fila ilimitada conforme o tempo permitir.

Parece mais trabalho do que a sua solução, mas pelo menos não envolve dar às filas comportamentos inesperados. Também imagino que haja uma maneira melhor de verificar o status da fila e dos threads em vez de depender de exceções, que são bastante lentas para serem lançadas.

Bstempi
fonte
Não gosto dessa solução. Tenho certeza de que ThreadPoolExecutor não foi projetado para herança.
scottb
Na verdade, há um exemplo de extensão direto no JavaDoc. Eles afirmam que a maioria provavelmente apenas implementará os métodos de gancho, mas dizem o que mais você precisa observar ao estender.
bstempi
0

Nota: Para JDK ThreadPoolExecutor quando você tem uma fila limitada, você só está criando novos encadeamentos quando a oferta está retornando false. Você pode obter algo útil com CallerRunsPolicy, que cria um pouco de BackPressure e chama diretamente o thread do chamador.

Preciso que as tarefas sejam executadas a partir de threads criados pelo pool e tenha uma fila de ubounded para agendamento, enquanto o número de threads dentro do pool pode aumentar ou diminuir entre corePoolSize e maximumPoolSize, então ...

Acabei fazendo um copy paste completo de ThreadPoolExecutor e mudei um pouco o método execute porque infelizmente isso não poderia ser feito por extensão (ele chama métodos privados).

Eu não queria gerar novos encadeamentos imediatamente quando uma nova solicitação chegasse e todos os encadeamentos estivessem ocupados (porque em geral tenho tarefas de curta duração). Eu adicionei um limite, mas sinta-se à vontade para alterá-lo de acordo com suas necessidades (talvez para principalmente no IO seja melhor remover esse limite)

private final AtomicInteger activeWorkers = new AtomicInteger(0);
private volatile double threshold = 0.7d;

protected void beforeExecute(Thread t, Runnable r) {
    activeWorkers.incrementAndGet();
}
protected void afterExecute(Runnable r, Throwable t) {
    activeWorkers.decrementAndGet();
}
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

        if (isRunning(c) && this.workQueue.offer(command)) {
            int recheck = this.ctl.get();
            if (!isRunning(recheck) && this.remove(command)) {
                this.reject(command);
            } else if (workerCountOf(recheck) == 0) {
                this.addWorker((Runnable) null, false);
            }
            //>>change start
            else if (workerCountOf(recheck) < maximumPoolSize //
                && (activeWorkers.get() > workerCountOf(recheck) * threshold
                    || workQueue.size() > workerCountOf(recheck) * threshold)) {
                this.addWorker((Runnable) null, false);
            }
            //<<change end
        } else if (!this.addWorker(command, false)) {
            this.reject(command);
        }
    }
Radu Toader
fonte