Já faz algum tempo que estou frustrado com o comportamento padrão ThreadPoolExecutor
que sustenta os ExecutorService
pools de threads que tantos de nós usamos. Para citar os Javadocs:
Se houver mais de corePoolSize, mas menos de threads maximumPoolSize em execução, um novo thread será criado apenas se a fila estiver cheia .
O que isso significa é que se você definir um pool de threads com o código a seguir, ele nunca iniciará o segundo thread porque o LinkedBlockingQueue
é ilimitado.
ExecutorService threadPool =
new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));
Somente se você tiver uma fila limitada e a fila estiver cheia, quaisquer threads acima do número do núcleo serão iniciados. Suspeito que um grande número de programadores Java multithreaded júnior não estão cientes desse comportamento do ThreadPoolExecutor
.
Agora eu tenho um caso de uso específico em que isso não é o ideal. Estou procurando maneiras, sem escrever minha própria classe TPE, de contornar isso.
Meus requisitos são para um serviço da web que está retornando a chamada para um terceiro possivelmente não confiável.
- Não quero fazer o retorno de chamada de forma síncrona com a solicitação da web, então quero usar um pool de threads.
- Normalmente recebo alguns desses por minuto, então não quero ter um
newFixedThreadPool(...)
com um grande número de threads que, em sua maioria, estão inativos. - De vez em quando, recebo uma explosão desse tráfego e desejo aumentar o número de threads para algum valor máximo (digamos 50).
- Preciso fazer o melhor possível para fazer todos os retornos de chamada, então quero enfileirar qualquer um adicional acima de 50. Não quero sobrecarregar o resto do meu servidor da web usando um
newCachedThreadPool()
.
Como posso contornar essa limitação de ThreadPoolExecutor
onde a fila precisa ser delimitada e cheia antes que mais threads sejam iniciados? Como posso fazer com que ele inicie mais threads antes de colocar as tarefas na fila?
Editar:
@Flavio faz uma boa observação sobre o uso de ThreadPoolExecutor.allowCoreThreadTimeOut(true)
para que os threads principais expirem e saiam. Eu considerei isso, mas ainda queria o recurso core-threads. Eu não queria que o número de threads no pool caísse abaixo do tamanho do núcleo, se possível.
Respostas:
Acredito que finalmente encontrei uma solução um tanto elegante (talvez um pouco hackeada) para essa limitação com
ThreadPoolExecutor
. Trata-seLinkedBlockingQueue
de estender para que ele retornefalse
paraqueue.offer(...)
quando já houver algumas tarefas na fila. Se os threads atuais não estiverem acompanhando as tarefas enfileiradas, o TPE adicionará threads adicionais. Se o pool já estiver no máximo de threads, oRejectedExecutionHandler
será chamado. É o manipulador que então faz oput(...)
na fila.Certamente é estranho escrever uma fila onde
offer(...)
pode retornarfalse
eput()
nunca bloquear, então essa é a parte do hack. Mas isso funciona bem com o uso da fila pelo TPE, então não vejo nenhum problema em fazer isso.Aqui está o código:
Com esse mecanismo, quando eu enviar tarefas para a fila, a
ThreadPoolExecutor
vontade:offer(...)
retornará falso.RejectedExecutionHandler
RejectedExecutionHandler
seguida, coloca a tarefa na fila para ser processada pelo primeiro thread disponível na ordem FIFO.Embora no meu código de exemplo acima, a fila seja ilimitada, você também pode defini-la como uma fila limitada. Por exemplo, se você adicionar uma capacidade de 1000 ao
LinkedBlockingQueue
, ele irá:Além disso, se você precisar usar
offer(...)
noRejectedExecutionHandler
, poderá usar ooffer(E, long, TimeUnit)
método em vez deLong.MAX_VALUE
como o tempo limite.Aviso:
Se você espera que as tarefas sejam adicionadas ao executor após ele ter sido encerrado, você pode querer ser mais esperto quanto a
RejectedExecutionException
descartar nosso serviço personalizadoRejectedExecutionHandler
quando o executor-serviço for encerrado. Obrigado a @RaduToader por apontar isso.Editar:
Outro ajuste para essa resposta poderia ser perguntar ao TPE se há threads inativos e apenas enfileirar o item se houver. Você teria que fazer uma verdadeira classe para isso e adicionar um
ourQueue.setThreadPoolExecutor(tpe);
método nela.Então, seu
offer(...)
método pode ser semelhante a:tpe.getPoolSize() == tpe.getMaximumPoolSize()
nesse caso basta ligarsuper.offer(...)
.tpe.getPoolSize() > tpe.getActiveCount()
, chame,super.offer(...)
pois parece haver threads inativos.false
para bifurcar outro tópico.Talvez isto:
Observe que os métodos get no TPE são caros, pois acessam
volatile
campos ou (no caso degetActiveCount()
) bloqueiam o TPE e percorrem a lista de threads. Além disso, existem condições de corrida aqui que podem fazer com que uma tarefa seja enfileirada incorretamente ou outro encadeamento bifurcado quando havia um encadeamento ocioso.fonte
Queue
para conseguir isso, você certamente não está sozinho em sua ideia: groovy-programming.com/post/26923146865execute(runnable)
, elerunnable
será adicionado à fila. Se você ligarexecute(secondRunnable)
, elesecondRunnable
será adicionado à fila. Mas agora, se você chamarexecute(thirdRunnable)
,thirdRunnable
será executado em um novo tópico. Orunnable
esecondRunnable
executa apenas uma vezthirdRunnable
(ou a tarefa original de longa duração) são concluídos.Defina o tamanho do núcleo e o tamanho máximo com o mesmo valor e permita que os encadeamentos principais sejam removidos do conjunto com
allowCoreThreadTimeOut(true)
.fonte
Já tenho duas outras respostas para essa pergunta, mas suspeito que esta seja a melhor.
Baseia-se na técnica da resposta atualmente aceita , a saber:
offer()
método da fila para (às vezes) retornar falso,ThreadPoolExecutor
com que o gere um novo thread ou rejeite a tarefa, eRejectedExecutionHandler
para realmente enfileirar a tarefa na rejeição.O problema é quando
offer()
deve retornar falso. A resposta aceita atualmente retorna falso quando a fila contém algumas tarefas, mas, como indiquei em meu comentário, isso causa efeitos indesejáveis. Como alternativa, se você sempre retornar false, continuará gerando novos threads, mesmo quando houver threads esperando na fila.A solução é usar Java 7
LinkedTransferQueue
e fazeroffer()
calltryTransfer()
. Quando há um segmento de consumidor em espera, a tarefa será simplesmente passada para esse segmento. Caso contrário,offer()
retornará falso eThreadPoolExecutor
gerará um novo tópico.fonte
queue.offer()
, porque está realmente chamandoLinkedTransferQueue.tryTransfer()
, retornará falso e não enfileirará a tarefa. Porém asRejectedExecutionHandler
chamadasqueue.put()
, que não falham e enfileiram a tarefa.Nota: agora prefiro e recomendo minha outra resposta .
Aqui está uma versão que me parece muito mais direta: Aumente o corePoolSize (até o limite de maximumPoolSize) sempre que uma nova tarefa for executada e, em seguida, diminua o corePoolSize (até o limite do "tamanho do pool principal" especificado pelo usuário) sempre que um tarefa concluída.
Em outras palavras, controle o número de tarefas em execução ou enfileiradas e certifique-se de que corePoolSize seja igual ao número de tarefas, contanto que esteja entre o "tamanho do pool principal" especificado pelo usuário e o maximumPoolSize.
Conforme escrito, a classe não oferece suporte à alteração do corePoolSize ou maximumPoolSize especificado pelo usuário após a construção e não oferece suporte à manipulação da fila de trabalho diretamente ou por meio de
remove()
oupurge()
.fonte
synchronized
blocos. Você pode ligar para a fila para obter o número de tarefas. Ou talvez use umAtomicInteger
?execute()
chamadas em threads separados, cada uma irá (1) descobrir quantos threads são necessários, (2)setCorePoolSize
para esse número e (3) chamarsuper.execute()
. Se as etapas (1) e (2) não estiverem sincronizadas, não tenho certeza de como evitar uma ordem infeliz em que você define o tamanho do pool principal para um número menor após um número maior. Com acesso direto ao campo da superclasse, isso poderia ser feito usando compare-and-set, mas não vejo uma maneira limpa de fazer isso em uma subclasse sem sincronização.taskCount
campo seja válido (ou seja, aAtomicInteger
). Se dois threads recalcularem o tamanho do pool imediatamente após o outro, ele deve obter os valores adequados. Se o segundo encolhe os threads principais, então deve ter visto uma queda na fila ou algo assim.execute()
. Cada um ligaráatomicTaskCount.incrementAndGet()
e receberá 10 e 11 respectivamente. Mas sem sincronização (além de obter a contagem de tarefas e definir o tamanho do pool principal), você poderia obter (1) a tarefa 11 define o tamanho do pool principal para 11, (2) a tarefa 10 define o tamanho do pool principal para 10, (3) a tarefa 10 chamasuper.execute()
, (4) a tarefa 11 chamasuper.execute()
e é enfileirada.Temos uma subclasse de
ThreadPoolExecutor
que leva um adicionalcreationThreshold
e substituiexecute
.talvez isso ajude também, mas o seu parece mais artístico, é claro ...
fonte
offer(...)
método só retornefalse
condicionalmente. Obrigado!A resposta recomendada resolve apenas um (1) dos problemas com o pool de threads JDK:
Os conjuntos de encadeamentos JDK são direcionados ao enfileiramento. Portanto, em vez de gerar um novo thread, eles enfileirarão a tarefa. Somente se a fila atingir seu limite, o pool de threads gerará uma nova thread.
A retirada da linha não acontece quando a carga fica mais leve. Por exemplo, se tivermos uma explosão de jobs atingindo o pool que faz com que o pool chegue ao máximo, seguido por uma carga leve de no máximo 2 tarefas por vez, o pool usará todos os threads para atender à carga leve, evitando a retirada do thread. (apenas 2 tópicos seriam necessários ...)
Insatisfeito com o comportamento acima, fui em frente e implementei um pool para superar as deficiências acima.
Para resolver 2) Usar o agendamento do Lifo resolve o problema. Esta ideia foi apresentada por Ben Maurer na conferência de aplicativos ACM 2015: escala Systems @ Facebook
Então, uma nova implementação nasceu:
LifoThreadPoolExecutorSQP
Até agora, esta implementação melhora o desempenho de execução assíncrona para ZEL .
A implementação tem capacidade de rotação para reduzir a sobrecarga da troca de contexto, gerando desempenho superior para determinados casos de uso.
Espero que ajude...
PS: JDK Fork Join Pool implementa ExecutorService e funciona como um pool de threads "normal", a implementação tem um bom desempenho, usa o agendamento de threads LIFO, no entanto, não há controle sobre o tamanho da fila interna, tempo limite de desativação ... e, o mais importante, as tarefas não podem ser interrompido ao cancelá-los
fonte
Nota: agora prefiro e recomendo minha outra resposta .
Tenho outra proposta, seguindo a ideia original de mudar a queue para retornar false. Neste todas as tarefas podem entrar na fila, mas sempre que uma tarefa é enfileirada depois
execute()
, nós a seguimos com uma tarefa sem operação sentinela que a fila rejeita, fazendo com que um novo thread seja gerado, que executará o no-op imediatamente seguido por algo da fila.Como os threads de trabalho podem estar pesquisando o
LinkedBlockingQueue
para uma nova tarefa, é possível que uma tarefa seja enfileirada mesmo quando há um thread disponível. Para evitar a geração de novos threads, mesmo quando há threads disponíveis, precisamos manter o controle de quantos threads estão esperando por novas tarefas na fila e apenas gerar um novo thread quando houver mais tarefas na fila do que threads em espera.fonte
A melhor solução que posso pensar é estender.
ThreadPoolExecutor
oferece alguns métodos de gancho:beforeExecute
eafterExecute
. Em sua extensão, você pode manter o uso de uma fila limitada para alimentar as tarefas e uma segunda fila ilimitada para lidar com o estouro. Quando alguém ligasubmit
, você pode tentar colocar a solicitação na fila limitada. Se você encontrar uma exceção, basta colocar a tarefa em sua fila de estouro. Você pode então utilizar oafterExecute
gancho para ver se há algo na fila de estouro após terminar uma tarefa. Dessa forma, o executor cuidará das coisas em sua fila limitada primeiro e puxará automaticamente dessa fila ilimitada conforme o tempo permitir.Parece mais trabalho do que a sua solução, mas pelo menos não envolve dar às filas comportamentos inesperados. Também imagino que haja uma maneira melhor de verificar o status da fila e dos threads em vez de depender de exceções, que são bastante lentas para serem lançadas.
fonte
Nota: Para JDK ThreadPoolExecutor quando você tem uma fila limitada, você só está criando novos encadeamentos quando a oferta está retornando false. Você pode obter algo útil com CallerRunsPolicy, que cria um pouco de BackPressure e chama diretamente o thread do chamador.
Preciso que as tarefas sejam executadas a partir de threads criados pelo pool e tenha uma fila de ubounded para agendamento, enquanto o número de threads dentro do pool pode aumentar ou diminuir entre corePoolSize e maximumPoolSize, então ...
Acabei fazendo um copy paste completo de ThreadPoolExecutor e mudei um pouco o método execute porque infelizmente isso não poderia ser feito por extensão (ele chama métodos privados).
Eu não queria gerar novos encadeamentos imediatamente quando uma nova solicitação chegasse e todos os encadeamentos estivessem ocupados (porque em geral tenho tarefas de curta duração). Eu adicionei um limite, mas sinta-se à vontade para alterá-lo de acordo com suas necessidades (talvez para principalmente no IO seja melhor remover esse limite)
fonte