Estou procurando uma implementação ExecutorService que pode ser fornecida com um tempo limite. As tarefas enviadas ao ExecutorService são interrompidas se demorarem mais que o tempo limite para serem executadas. Implementar tal besta não é uma tarefa tão difícil, mas estou me perguntando se alguém sabe de uma implementação existente.
Aqui está o que eu vim com base em algumas das discussões abaixo. Algum comentário?
import java.util.List;
import java.util.concurrent.*;
public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
private final long timeout;
private final TimeUnit timeoutUnit;
private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
@Override
public void shutdown() {
timeoutExecutor.shutdown();
super.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
timeoutExecutor.shutdownNow();
return super.shutdownNow();
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
if(timeout > 0) {
final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
runningTasks.put(r, scheduled);
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
ScheduledFuture timeoutTask = runningTasks.remove(r);
if(timeoutTask != null) {
timeoutTask.cancel(false);
}
}
class TimeoutTask implements Runnable {
private final Thread thread;
public TimeoutTask(Thread thread) {
this.thread = thread;
}
@Override
public void run() {
thread.interrupt();
}
}
}
java
multithreading
concurrency
executorservice
Edward Dale
fonte
fonte
protected void beforeExecute(Thread t, Runnable r)
gancho.Respostas:
Você pode usar um ScheduledExecutorService para isso. Primeiro, você o enviaria apenas uma vez para começar imediatamente e reter o futuro que foi criado. Depois disso, você pode enviar uma nova tarefa que cancelaria o futuro retido após algum tempo.
Isso executará seu manipulador (funcionalidade principal a ser interrompida) por 10 segundos e, em seguida, cancelará (ou seja, interromperá) essa tarefa específica.
fonte
beforeExecute
anzol.Infelizmente, a solução é falha. Há uma espécie de bug com
ScheduledThreadPoolExecutor
, também relatado nesta pergunta : cancelar uma tarefa enviada não libera totalmente os recursos de memória associados à tarefa; os recursos são liberados somente quando a tarefa expira.Portanto, se você criar um
TimeoutThreadPoolExecutor
com um tempo de expiração bastante longo (um uso típico) e enviar tarefas com rapidez suficiente, acabará enchendo a memória - mesmo que as tarefas tenham sido concluídas com sucesso.Você pode ver o problema com o seguinte programa de teste (muito bruto):
O programa esgota a memória disponível, embora espere que os
Runnable
s gerados sejam concluídos.Pensei um pouco sobre isso, mas infelizmente não consegui encontrar uma boa solução.
EDITAR: descobri que esse problema foi relatado como bug do JDK 6602600 e parece ter sido corrigido recentemente.
fonte
Envolva a tarefa em FutureTask e você pode especificar o tempo limite para FutureTask. Veja o exemplo em minha resposta a esta pergunta,
tempo limite do processo nativo java
fonte
java.util.concurrent
classes, mas estou procurando umaExecutorService
implementação.Depois de muito tempo para pesquisar,
finalmente, uso o
invokeAll
método deExecutorService
para resolver esse problema.Isso interromperá estritamente a tarefa durante a execução da tarefa.
Aqui está um exemplo
O pro é que você também pode enviar
ListenableFuture
ao mesmoExecutorService
.Apenas altere ligeiramente a primeira linha de código.
ListeningExecutorService
é o recurso de escuta doExecutorService
no projeto google guava ( com.google.guava ))fonte
invokeAll
. Isso funciona muito bem. Apenas uma palavra de cautela para quem está pensando em usar isso: emborainvokeAll
retorne uma lista deFuture
objetos, na verdade parece ser uma operação de bloqueio.Que tal usar o
ExecutorService.shutDownNow()
método descrito em http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html ? Parece ser a solução mais simples.fonte
Parece que o problema não está no bug 6602600 do JDK (ele foi resolvido em 22/05/2010), mas na chamada incorreta de hibernação (10) no círculo. Além disso, observe que o Thread principal deve dar CHANCE diretamente a outros threads para realizar suas tarefas invocando SLEEP (0) em CADA ramo do círculo externo. É melhor, eu acho, usar Thread.yield () em vez de Thread.sleep (0)
A parte corrigida do resultado do código do problema anterior é assim:
Funciona corretamente com quantidade de contador externo de até 150 milhões de círculos testados.
fonte
Usando a resposta de John W, criei uma implementação que inicia corretamente o tempo limite quando a tarefa inicia sua execução. Eu até escrevo um teste de unidade para ele :)
No entanto, não atende às minhas necessidades, já que algumas operações IO não interrompem quando
Future.cancel()
é chamado (ou seja, quandoThread.interrupt()
é chamado). Alguns exemplos de operação IO que não podem ser interrompidos quandoThread.interrupt()
é chamada sãoSocket.connect
eSocket.read
(e eu suspeito que a maioria das operações IO implementadas emjava.io
). Todas as operações IO emjava.nio
devem ser interrompíveis quandoThread.interrupt()
for chamado. Por exemplo, esse é o caso deSocketChannel.open
eSocketChannel.read
.De qualquer forma, se alguém estiver interessado, criei uma essência para um executor de pool de threads que permite que as tarefas expirem (se estiverem usando operações interruptíveis ...): https://gist.github.com/amanteaux/64c54a913c1ae34ad7b86db109cbc0bf
fonte
Socket.connect
eSocket.read
myThread.interrupted()
não é o método correto para interromper, pois APAGA o sinalizador de interrupção. Use em seumyThread.interrupt()
lugar, e isso deveria com soquetesThread.interrupted()
não permite interromper um tópico. Porém,Thread.interrupt()
não interrompe asjava.io
operações, funciona apenas nasjava.nio
operações.interrupt()
por muitos anos e sempre interrompeu as operações java.io (bem como outros métodos de bloqueio, como thread sleep, conexões jdbc, blockingqueue take, etc). Talvez você tenha encontrado uma classe com bugs ou alguma JVM que tenha bugsE esta ideia alternativa:
Uma pequena amostra está aqui:
fonte
verifique se isso funciona para você,
você pode restringir o número de uso de thread do planejador, bem como colocar o tempo limite na tarefa.
fonte