Quero criar um ThreadPoolExecutor
tal que, quando atingir seu tamanho máximo e a fila estiver cheia, o submit()
método bloqueie ao tentar adicionar novas tarefas. Preciso implementar um personalizado RejectedExecutionHandler
para isso ou existe uma maneira de fazer isso usando uma biblioteca Java padrão?
java
concurrency
executor
Ponto fixo
fonte
fonte
Respostas:
Uma das soluções possíveis que acabei de encontrar:
Existem outras soluções? Eu prefiro algo baseado em,
RejectedExecutionHandler
pois parece uma maneira padrão de lidar com essas situações.fonte
throw e;
está no livro. JCIP está correto!Você pode usar ThreadPoolExecutor e um BlockQueue:
fonte
Você deve usar o
CallerRunsPolicy
, que executa a tarefa rejeitada no thread de chamada. Dessa forma, ele não pode enviar nenhuma tarefa nova ao executor até que a tarefa seja concluída, quando então haverá alguns threads de pool livres ou o processo se repetirá.http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html
Dos documentos:
Além disso, certifique-se de usar uma fila limitada, como ArrayBlockingQueue, ao chamar o
ThreadPoolExecutor
construtor. Caso contrário, nada será rejeitado.Editar: em resposta ao seu comentário, defina o tamanho de ArrayBlockingQueue para ser igual ao tamanho máximo do pool de threads e use a AbortPolicy.
Edit 2: Ok, vejo o que você quer chegar. Que tal isso: sobrescrever o
beforeExecute()
método para verificargetActiveCount()
se não excedegetMaximumPoolSize()
e, se exceder , durma e tente novamente?fonte
O Hibernate tem um
BlockPolicy
que é simples e pode fazer o que você quiser:Veja: Executors.java
fonte
ThreadPoolExecutor
even diz literalmente: "O método getQueue () permite o acesso à fila de trabalho para fins de monitoramento e depuração. O uso desse método para qualquer outro propósito é fortemente desencorajado.". É absolutamente triste ver que isso está disponível em uma biblioteca tão conhecida.A
BoundedExecutor
resposta citada acima do Java Concurrency in Practice só funciona corretamente se você usar uma fila ilimitada para o Executor ou se o limite do semáforo não for maior que o tamanho da fila. O semáforo é o estado compartilhado entre a thread de envio e as threads no pool, tornando possível saturar o executor mesmo se o tamanho da fila <limite <= (tamanho da fila + tamanho do pool).O uso
CallerRunsPolicy
só é válido se suas tarefas não forem executadas para sempre, caso em que seu tópico de envio permanecerá pararejectedExecution
sempre, e uma má ideia se suas tarefas demorarem para serem executadas, porque o tópico de envio não pode enviar novas tarefas ou faça qualquer outra coisa se estiver executando uma tarefa em si.Se isso não for aceitável, sugiro verificar o tamanho da fila limitada do executor antes de enviar uma tarefa. Se a fila estiver cheia, aguarde um pouco antes de tentar enviar novamente. O rendimento será afetado, mas sugiro que seja uma solução mais simples do que muitas das outras soluções propostas e você tem a garantia de que nenhuma tarefa será rejeitada.
fonte
Eu sei, é um hack, mas na minha opinião o hack mais limpo entre os oferecidos aqui ;-)
Como ThreadPoolExecutor usa a fila de bloqueio "oferta" em vez de "colocar", permite substituir o comportamento de "oferta" da fila de bloqueio:
Eu testei e parece funcionar. A implementação de alguma política de tempo limite é deixada como um exercício do leitor.
fonte
A classe a seguir envolve um ThreadPoolExecutor e usa um Semaphore para bloquear a fila de trabalho está cheia:
Essa classe de wrapper é baseada em uma solução fornecida no livro Java Concurrency in Practice de Brian Goetz. A solução no livro leva apenas dois parâmetros do construtor: um
Executor
e um limite usado para o semáforo. Isso é mostrado na resposta dada por Fixpoint. Há um problema com essa abordagem: ela pode chegar a um estado em que os threads do pool estão ocupados, a fila está cheia, mas o semáforo acaba de liberar uma licença. (semaphore.release()
no bloco finalmente). Nesse estado, uma nova tarefa pode obter a licença recém-liberada, mas é rejeitada porque a fila de tarefas está cheia. Claro que isso não é algo que você deseja; você deseja bloquear neste caso.Para resolver isso, devemos usar uma fila ilimitada , como o JCiP claramente menciona. O semáforo atua como um guarda, dando o efeito de um tamanho de fila virtual. Isso tem o efeito colateral de que é possível que a unidade possa conter
maxPoolSize + virtualQueueSize + maxPoolSize
tarefas. Por que é que? Por causa dosemaphore.release()
bloco no último. Se todos os threads do pool chamarem essa instrução ao mesmo tempo, asmaxPoolSize
permissões serão liberadas, permitindo que o mesmo número de tarefas entre na unidade. Se estivéssemos usando uma fila limitada, ela ainda estaria cheia, resultando em uma tarefa rejeitada. Agora, como sabemos que isso ocorre apenas quando um thread de pool está quase pronto, isso não é um problema. Sabemos que o encadeamento do pool não será bloqueado, portanto, uma tarefa logo será retirada da fila.No entanto, você pode usar uma fila limitada. Apenas certifique-se de que seu tamanho seja igual
virtualQueueSize + maxPoolSize
. Tamanhos maiores são inúteis, o semáforo impedirá a entrada de mais itens. Tamanhos menores resultarão em tarefas rejeitadas. A chance de as tarefas serem rejeitadas aumenta conforme o tamanho diminui. Por exemplo, digamos que você queira um executor limitado com maxPoolSize = 2 e virtualQueueSize = 5. Em seguida, pegue um semáforo com 5 + 2 = 7 licenças e um tamanho de fila real de 5 + 2 = 7. O número real de tarefas que podem estar na unidade é 2 + 5 + 2 = 9. Quando o executor está cheio (5 tarefas na fila, 2 no pool de threads, portanto, 0 licenças disponíveis) e TODAS as threads do pool liberam suas permissões, então exatamente 2 licenças podem ser obtidas pelas tarefas que chegam.Agora, a solução do JCiP é um tanto complicada de usar, pois não impõe todas essas restrições (fila ilimitada ou limitada por essas restrições matemáticas, etc.). Acho que isso serve apenas como um bom exemplo para demonstrar como você pode construir novas classes thread-safe com base nas partes que já estão disponíveis, mas não como uma classe totalmente desenvolvida e reutilizável. Não creio que seja esta a intenção do autor.
fonte
você pode usar um RejectedExecutionHandler personalizado como este
fonte
Crie sua própria fila de bloqueio para ser usada pelo Executor, com o comportamento de bloqueio que você está procurando, enquanto sempre retorna a capacidade restante disponível (garantindo que o executor não tente criar mais threads do que seu pool principal ou acionar o manipulador de rejeição).
Acredito que isso lhe dará o comportamento de bloqueio que você está procurando. Um manipulador de rejeição nunca caberá a conta, pois isso indica que o executor não pode executar a tarefa. O que eu poderia imaginar é que você obtém alguma forma de 'espera ocupada' no manipulador. Não é isso que você quer, você quer uma fila para o executor que bloqueie o chamador ...
fonte
ThreadPoolExecutor
usa ooffer
método para adicionar tarefas à fila. Se eu criasse um costumeBlockingQueue
que bloqueiaoffer
, isso quebrariaBlockingQueue
o contrato.ThreadPoolExecutor
foi implementado para usaroffer
e nãoput
(a versão de bloqueio)? Além disso, se houvesse uma maneira de o código do cliente dizer qual usar quando, muitas pessoas tentando criar soluções personalizadas à mão ficariam aliviadasPara evitar problemas com a solução @FixPoint. Pode-se usar ListeningExecutorService e liberar o semáforo onSuccess e onFailure dentro de FutureCallback.
fonte
Runnable
pois esses métodos ainda são chamados antes da limpeza do trabalhador no normalThreadPoolExecutor
. Ou seja, você ainda precisará lidar com as exceções de rejeição.Recentemente descobri que esta questão tinha o mesmo problema. O OP não diz isso explicitamente, mas não queremos usar o
RejectedExecutionHandler
que executa uma tarefa no thread do remetente, porque isso subutilizará os threads de trabalho se essa tarefa for longa.Lendo todas as respostas e comentários, em particular a solução falha com o semáforo ou usando
afterExecute
dei uma olhada mais de perto no código do ThreadPoolExecutor para ver se existe alguma saída. Fiquei surpreso ao ver que existem mais de 2.000 linhas de código (comentadas), algumas das quais me deixam tonto . Dado o requisito bastante simples que realmente tenho - um produtor, vários consumidores, deixe o produtor bloquear quando nenhum consumidor puder trabalhar - decidi lançar minha própria solução. Não é um,ExecutorService
mas apenas umExecutor
. E não adapta o número de threads à carga de trabalho, mas mantém apenas um número fixo de threads, o que também atende aos meus requisitos. Aqui está o código. Sinta-se à vontade para reclamar :-)fonte
Acredito que haja uma maneira bastante elegante de resolver esse problema usando
java.util.concurrent.Semaphore
e delegando o comportamento deExecutor.newFixedThreadPool
. O novo serviço executor só executará uma nova tarefa quando houver um thread para fazê-lo. O bloqueio é gerenciado pelo Semaphore com número de permissões igual ao número de threads. Quando uma tarefa é concluída, ele retorna uma licença.fonte
Eu tive a mesma necessidade no passado: uma espécie de fila de bloqueio com um tamanho fixo para cada cliente apoiado por um pool de thread compartilhado. Acabei escrevendo meu próprio tipo de ThreadPoolExecutor:
UserThreadPoolExecutor (fila de bloqueio (por cliente) + threadpool (compartilhado entre todos os clientes))
Veja: https://github.com/d4rxh4wx/UserThreadPoolExecutor
Cada UserThreadPoolExecutor recebe um número máximo de threads de um ThreadPoolExecutor compartilhado
Cada UserThreadPoolExecutor pode:
fonte
Encontrei essa política de rejeição no cliente de pesquisa elástica. Ele bloqueia o thread do chamador na fila de bloqueio. Código abaixo-
fonte
Recentemente, tive a necessidade de alcançar algo semelhante, mas em a
ScheduledExecutorService
.Eu também tive que garantir que lidei com o atraso que está sendo passado no método e que a tarefa seja enviada para ser executada no momento em que o chamador espera ou apenas falhe, lançando um
RejectedExecutionException
.Outros métodos
ScheduledThreadPoolExecutor
para executar ou enviar uma tarefa chamam internamente#schedule
que, por sua vez, ainda invocará os métodos substituídos.Eu tenho o código aqui, agradecemos qualquer feedback. https://github.com/AmitabhAwasthi/BlockingScheduler
fonte
Aqui está a solução que parece funcionar muito bem. É chamado de NotifyingBlockingThreadPoolExecutor .
Programa de demonstração.
Edit: há um problema com este código, o método await () é bugado. Chamar shutdown () + awaitTermination () parece funcionar bem.
fonte
Nem sempre gosto da CallerRunsPolicy, especialmente porque ela permite que a tarefa rejeitada 'pule a fila' e seja executada antes das tarefas enviadas anteriormente. Além disso, executar a tarefa no encadeamento de chamada pode demorar muito mais do que esperar que o primeiro slot fique disponível.
Resolvi esse problema usando um RejectedExecutionHandler personalizado, que simplesmente bloqueia o thread de chamada por um tempo e depois tenta enviar a tarefa novamente:
Esta classe só pode ser usada no executor do pool de threads como um RejectedExecutinHandler como qualquer outra, por exemplo:
A única desvantagem que vejo é que o thread de chamada pode ficar bloqueado um pouco mais do que o estritamente necessário (até 250 ms). Além disso, como esse executor está efetivamente sendo chamado recursivamente, esperas muito longas para que um thread se torne disponível (horas) pode resultar em um estouro de pilha.
No entanto, eu pessoalmente gosto desse método. É compacto, fácil de entender e funciona bem.
fonte