ExecutorService que interrompe tarefas após um tempo limite

93

Estou procurando uma implementação ExecutorService que pode ser fornecida com um tempo limite. As tarefas enviadas ao ExecutorService são interrompidas se demorarem mais que o tempo limite para serem executadas. Implementar tal besta não é uma tarefa tão difícil, mas estou me perguntando se alguém sabe de uma implementação existente.

Aqui está o que eu vim com base em algumas das discussões abaixo. Algum comentário?

import java.util.List;
import java.util.concurrent.*;

public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
    private final long timeout;
    private final TimeUnit timeoutUnit;

    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
    private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    @Override
    public void shutdown() {
        timeoutExecutor.shutdown();
        super.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        timeoutExecutor.shutdownNow();
        return super.shutdownNow();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if(timeout > 0) {
            final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
            runningTasks.put(r, scheduled);
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        ScheduledFuture timeoutTask = runningTasks.remove(r);
        if(timeoutTask != null) {
            timeoutTask.cancel(false);
        }
    }

    class TimeoutTask implements Runnable {
        private final Thread thread;

        public TimeoutTask(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            thread.interrupt();
        }
    }
}
Edward Dale
fonte
Essa é a 'hora de início' do tempo limite da hora de envio? Ou a hora em que a tarefa começa a ser executada?
Tim Bender
Boa pergunta. Quando começa a ser executado. Presumivelmente usando o protected void beforeExecute(Thread t, Runnable r)gancho.
Edward Dale
@ scompt.com você ainda está usando essa solução ou ela foi substituída
Paul Taylor
@PaulTaylor O trabalho onde implementei esta solução foi substituído. :-)
Edward Dale
Preciso exatamente disso, exceto a) Preciso que meu serviço de agendador principal seja um pool de threads com um único thread de serviço, pois preciso que minhas tarefas sejam executadas estritamente simultaneamente eb) Preciso ser capaz de especificar a duração do tempo limite para cada tarefa no hora em que a tarefa é enviada. Tentei usar isso como ponto de partida, mas estendendo ScheduledThreadPoolExecutor, mas não consigo ver uma maneira de obter a duração do tempo limite especificado que deve ser especificado no momento do envio da tarefa até o método beforeExecute. Todas as sugestões apreciadas com gratidão!
Michael Ellis de

Respostas:

89

Você pode usar um ScheduledExecutorService para isso. Primeiro, você o enviaria apenas uma vez para começar imediatamente e reter o futuro que foi criado. Depois disso, você pode enviar uma nova tarefa que cancelaria o futuro retido após algum tempo.

 ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); 
 final Future handler = executor.submit(new Callable(){ ... });
 executor.schedule(new Runnable(){
     public void run(){
         handler.cancel();
     }      
 }, 10000, TimeUnit.MILLISECONDS);

Isso executará seu manipulador (funcionalidade principal a ser interrompida) por 10 segundos e, em seguida, cancelará (ou seja, interromperá) essa tarefa específica.

John Vint
fonte
12
Ideia interessante, mas e se a tarefa terminar antes do tempo limite (o que normalmente acontecerá)? Eu prefiro não ter toneladas de tarefas de limpeza esperando para serem executadas apenas para descobrir que sua tarefa atribuída já foi concluída. Seria necessário haver outro thread monitorando os Futures enquanto eles terminam de remover suas tarefas de limpeza.
Edward Dale
3
O executor irá agendar este cancelamento apenas uma vez. Se a tarefa for concluída, o cancelamento é uma operação sem operação e o trabalho continua inalterado. Só precisa haver um esquema de thread extra para cancelar as tarefas e um thread para executá-las. Você poderia ter dois executores, um para enviar suas tarefas principais e outro para cancelá-las.
John Vint
3
Isso é verdade, mas e se o tempo limite for de 5 horas e, nesse tempo, 10k tarefas forem executadas. Eu gostaria de evitar ter todos aqueles autônomos ocupando memória e causando mudanças de contexto.
Edward Dale
1
@Scompt Não necessariamente. Haveria 10k invocações de future.cancel (), no entanto, se o futuro for concluído, o cancelamento será executado rapidamente e não fará nenhum trabalho desnecessário. Se você não quiser 10k chamadas de cancelamento extras, isso pode não funcionar, mas a quantidade de trabalho realizada quando uma tarefa é concluída é muito pequena.
John Vint
6
@John W .: Acabei de perceber outro problema com sua implementação. Preciso que o tempo limite comece quando a tarefa começa a ser executada, como comentei anteriormente. Acho que a única maneira de fazer isso é usar o beforeExecuteanzol.
Edward Dale
6

Infelizmente, a solução é falha. Há uma espécie de bug com ScheduledThreadPoolExecutor, também relatado nesta pergunta : cancelar uma tarefa enviada não libera totalmente os recursos de memória associados à tarefa; os recursos são liberados somente quando a tarefa expira.

Portanto, se você criar um TimeoutThreadPoolExecutorcom um tempo de expiração bastante longo (um uso típico) e enviar tarefas com rapidez suficiente, acabará enchendo a memória - mesmo que as tarefas tenham sido concluídas com sucesso.

Você pode ver o problema com o seguinte programa de teste (muito bruto):

public static void main(String[] args) throws InterruptedException {
    ExecutorService service = new TimeoutThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, 
            new LinkedBlockingQueue<Runnable>(), 10, TimeUnit.MINUTES);
    //ExecutorService service = Executors.newFixedThreadPool(1);
    try {
        final AtomicInteger counter = new AtomicInteger();
        for (long i = 0; i < 10000000; i++) {
            service.submit(new Runnable() {
                @Override
                public void run() {
                    counter.incrementAndGet();
                }
            });
            if (i % 10000 == 0) {
                System.out.println(i + "/" + counter.get());
                while (i > counter.get()) {
                    Thread.sleep(10);
                }
            }
        }
    } finally {
        service.shutdown();
    }
}

O programa esgota a memória disponível, embora espere que os Runnables gerados sejam concluídos.

Pensei um pouco sobre isso, mas infelizmente não consegui encontrar uma boa solução.

EDITAR: descobri que esse problema foi relatado como bug do JDK 6602600 e parece ter sido corrigido recentemente.

Flavio
fonte
4

Envolva a tarefa em FutureTask e você pode especificar o tempo limite para FutureTask. Veja o exemplo em minha resposta a esta pergunta,

tempo limite do processo nativo java

ZZ Coder
fonte
1
Sei que existem algumas maneiras de fazer isso usando as java.util.concurrentclasses, mas estou procurando uma ExecutorServiceimplementação.
Edward Dale
1
Se você está dizendo que deseja que seu ExecutorService oculte o fato de que os tempos limites estão sendo adicionados do código do cliente, você pode implementar seu próprio ExecutorService que envolve cada executável entregue a ele com um FutureTask antes de executá-los.
erikprice
2

Depois de muito tempo para pesquisar,
finalmente, uso o invokeAllmétodo de ExecutorServicepara resolver esse problema.
Isso interromperá estritamente a tarefa durante a execução da tarefa.
Aqui está um exemplo

ExecutorService executorService = Executors.newCachedThreadPool();

try {
    List<Callable<Object>> callables = new ArrayList<>();
    // Add your long time task (callable)
    callables.add(new VaryLongTimeTask());
    // Assign tasks for specific execution timeout (e.g. 2 sec)
    List<Future<Object>> futures = executorService.invokeAll(callables, 2000, TimeUnit.MILLISECONDS);
    for (Future<Object> future : futures) {
        // Getting result
    }
} catch (InterruptedException e) {
    e.printStackTrace();
}

executorService.shutdown();

O pro é que você também pode enviar ListenableFutureao mesmo ExecutorService.
Apenas altere ligeiramente a primeira linha de código.

ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

ListeningExecutorServiceé o recurso de escuta do ExecutorServiceno projeto google guava ( com.google.guava ))

Johnny
fonte
2
Obrigado por apontar invokeAll. Isso funciona muito bem. Apenas uma palavra de cautela para quem está pensando em usar isso: embora invokeAllretorne uma lista de Futureobjetos, na verdade parece ser uma operação de bloqueio.
mxro
1

Parece que o problema não está no bug 6602600 do JDK (ele foi resolvido em 22/05/2010), mas na chamada incorreta de hibernação (10) no círculo. Além disso, observe que o Thread principal deve dar CHANCE diretamente a outros threads para realizar suas tarefas invocando SLEEP (0) em CADA ramo do círculo externo. É melhor, eu acho, usar Thread.yield () em vez de Thread.sleep (0)

A parte corrigida do resultado do código do problema anterior é assim:

.......................
........................
Thread.yield();         

if (i % 1000== 0) {
System.out.println(i + "/" + counter.get()+ "/"+service.toString());
}

//                
//                while (i > counter.get()) {
//                    Thread.sleep(10);
//                } 

Funciona corretamente com quantidade de contador externo de até 150 milhões de círculos testados.

Sergey
fonte
1

Usando a resposta de John W, criei uma implementação que inicia corretamente o tempo limite quando a tarefa inicia sua execução. Eu até escrevo um teste de unidade para ele :)

No entanto, não atende às minhas necessidades, já que algumas operações IO não interrompem quando Future.cancel()é chamado (ou seja, quando Thread.interrupt()é chamado). Alguns exemplos de operação IO que não podem ser interrompidos quando Thread.interrupt()é chamada são Socket.connecte Socket.read(e eu suspeito que a maioria das operações IO implementadas em java.io). Todas as operações IO em java.niodevem ser interrompíveis quando Thread.interrupt()for chamado. Por exemplo, esse é o caso de SocketChannel.opene SocketChannel.read.

De qualquer forma, se alguém estiver interessado, criei uma essência para um executor de pool de threads que permite que as tarefas expirem (se estiverem usando operações interruptíveis ...): https://gist.github.com/amanteaux/64c54a913c1ae34ad7b86db109cbc0bf

amanteaux
fonte
Código interessante, coloquei-o em meu sistema e estou curioso para saber se você tem alguns exemplos de que tipo de operações de IO não serão interrompidas para que eu possa ver se isso afetará meu sistema. Obrigado!
Duncan Krebs
@DuncanKrebs Eu detalhei minha resposta com um exemplo de IO ininterrupto: Socket.connecteSocket.read
amanteaux
myThread.interrupted()não é o método correto para interromper, pois APAGA o sinalizador de interrupção. Use em seu myThread.interrupt()lugar, e isso deveria com soquetes
DanielCuadra
@DanielCuadra: Obrigado, parece que cometi um erro de digitação, pois Thread.interrupted()não permite interromper um tópico. Porém, Thread.interrupt()não interrompe as java.iooperações, funciona apenas nas java.niooperações.
amanteaux
Eu tenho usado interrupt()por muitos anos e sempre interrompeu as operações java.io (bem como outros métodos de bloqueio, como thread sleep, conexões jdbc, blockingqueue take, etc). Talvez você tenha encontrado uma classe com bugs ou alguma JVM que tenha bugs
DanielCuadra
0

E esta ideia alternativa:

  • dois têm dois executores:
    • um para :
      • enviar a tarefa, sem se preocupar com o tempo limite da tarefa
      • adicionando o resultado Futuro e o momento em que deve terminar em uma estrutura interna
    • um para a execução de um trabalho interno que é verificar na estrutura interna se algumas tarefas estão com timeout e se devem ser canceladas.

Uma pequena amostra está aqui:

public class AlternativeExecutorService 
{

private final CopyOnWriteArrayList<ListenableFutureTask> futureQueue       = new CopyOnWriteArrayList();
private final ScheduledThreadPoolExecutor                scheduledExecutor = new ScheduledThreadPoolExecutor(1); // used for internal cleaning job
private final ListeningExecutorService                   threadExecutor    = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); // used for
private ScheduledFuture scheduledFuture;
private static final long INTERNAL_JOB_CLEANUP_FREQUENCY = 1000L;

public AlternativeExecutorService()
{
    scheduledFuture = scheduledExecutor.scheduleAtFixedRate(new TimeoutManagerJob(), 0, INTERNAL_JOB_CLEANUP_FREQUENCY, TimeUnit.MILLISECONDS);
}

public void pushTask(OwnTask task)
{
    ListenableFuture<Void> future = threadExecutor.submit(task);  // -> create your Callable
    futureQueue.add(new ListenableFutureTask(future, task, getCurrentMillisecondsTime())); // -> store the time when the task should end
}

public void shutdownInternalScheduledExecutor()
{
    scheduledFuture.cancel(true);
    scheduledExecutor.shutdownNow();
}

long getCurrentMillisecondsTime()
{
    return Calendar.getInstance().get(Calendar.MILLISECOND);
}

class ListenableFutureTask
{
    private final ListenableFuture<Void> future;
    private final OwnTask                task;
    private final long                   milliSecEndTime;

    private ListenableFutureTask(ListenableFuture<Void> future, OwnTask task, long milliSecStartTime)
    {
        this.future = future;
        this.task = task;
        this.milliSecEndTime = milliSecStartTime + task.getTimeUnit().convert(task.getTimeoutDuration(), TimeUnit.MILLISECONDS);
    }

    ListenableFuture<Void> getFuture()
    {
        return future;
    }

    OwnTask getTask()
    {
        return task;
    }

    long getMilliSecEndTime()
    {
        return milliSecEndTime;
    }
}

class TimeoutManagerJob implements Runnable
{
    CopyOnWriteArrayList<ListenableFutureTask> getCopyOnWriteArrayList()
    {
        return futureQueue;
    }

    @Override
    public void run()
    {
        long currentMileSecValue = getCurrentMillisecondsTime();
        for (ListenableFutureTask futureTask : futureQueue)
        {
            consumeFuture(futureTask, currentMileSecValue);
        }
    }

    private void consumeFuture(ListenableFutureTask futureTask, long currentMileSecValue)
    {
        ListenableFuture<Void> future = futureTask.getFuture();
        boolean isTimeout = futureTask.getMilliSecEndTime() >= currentMileSecValue;
        if (isTimeout)
        {
            if (!future.isDone())
            {
                future.cancel(true);
            }
            futureQueue.remove(futureTask);
        }
    }
}

class OwnTask implements Callable<Void>
{
    private long     timeoutDuration;
    private TimeUnit timeUnit;

    OwnTask(long timeoutDuration, TimeUnit timeUnit)
    {
        this.timeoutDuration = timeoutDuration;
        this.timeUnit = timeUnit;
    }

    @Override
    public Void call() throws Exception
    {
        // do logic
        return null;
    }

    public long getTimeoutDuration()
    {
        return timeoutDuration;
    }

    public TimeUnit getTimeUnit()
    {
        return timeUnit;
    }
}
}
Ionut Mesaros
fonte
0

verifique se isso funciona para você,

    public <T,S,K,V> ResponseObject<Collection<ResponseObject<T>>> runOnScheduler(ThreadPoolExecutor threadPoolExecutor,
      int parallelismLevel, TimeUnit timeUnit, int timeToCompleteEachTask, Collection<S> collection,
      Map<K,V> context, Task<T,S,K,V> someTask){
    if(threadPoolExecutor==null){
      return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("threadPoolExecutor can not be null").build();
    }
    if(someTask==null){
      return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("Task can not be null").build();
    }
    if(CollectionUtils.isEmpty(collection)){
      return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("input collection can not be empty").build();
    }

    LinkedBlockingQueue<Callable<T>> callableLinkedBlockingQueue = new LinkedBlockingQueue<>(collection.size());
    collection.forEach(value -> {
      callableLinkedBlockingQueue.offer(()->someTask.perform(value,context)); //pass some values in callable. which can be anything.
    });
    LinkedBlockingQueue<Future<T>> futures = new LinkedBlockingQueue<>();

    int count = 0;

    while(count<parallelismLevel && count < callableLinkedBlockingQueue.size()){
      Future<T> f = threadPoolExecutor.submit(callableLinkedBlockingQueue.poll());
      futures.offer(f);
      count++;
    }

    Collection<ResponseObject<T>> responseCollection = new ArrayList<>();

    while(futures.size()>0){
      Future<T> future = futures.poll();
      ResponseObject<T> responseObject = null;
        try {
          T response = future.get(timeToCompleteEachTask, timeUnit);
          responseObject = ResponseObject.<T>builder().data(response).build();
        } catch (InterruptedException e) {
          future.cancel(true);
        } catch (ExecutionException e) {
          future.cancel(true);
        } catch (TimeoutException e) {
          future.cancel(true);
        } finally {
          if (Objects.nonNull(responseObject)) {
            responseCollection.add(responseObject);
          }
          futures.remove(future);//remove this
          Callable<T> callable = getRemainingCallables(callableLinkedBlockingQueue);
          if(null!=callable){
            Future<T> f = threadPoolExecutor.submit(callable);
            futures.add(f);
          }
        }

    }
    return ResponseObject.<Collection<ResponseObject<T>>>builder().data(responseCollection).build();
  }

  private <T> Callable<T> getRemainingCallables(LinkedBlockingQueue<Callable<T>> callableLinkedBlockingQueue){
    if(callableLinkedBlockingQueue.size()>0){
      return callableLinkedBlockingQueue.poll();
    }
    return null;
  }

você pode restringir o número de uso de thread do planejador, bem como colocar o tempo limite na tarefa.

Nitish Kumar
fonte