Como fazer o bloco do método submit () de ThreadPoolExecutor se ele estiver saturado?

102

Quero criar um ThreadPoolExecutortal que, quando atingir seu tamanho máximo e a fila estiver cheia, o submit()método bloqueie ao tentar adicionar novas tarefas. Preciso implementar um personalizado RejectedExecutionHandlerpara isso ou existe uma maneira de fazer isso usando uma biblioteca Java padrão?

Ponto fixo
fonte
2
O que você deseja é semelhante ao método offer () da fila de bloqueio de Array?
extraneon
2
@bacar eu discordo. Este Q&A parece mais valioso (além de ser mais antigo).
JasonMArcher

Respostas:

47

Uma das soluções possíveis que acabei de encontrar:

public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command)
            throws InterruptedException, RejectedExecutionException {
        semaphore.acquire();
        try {
            exec.execute(new Runnable() {
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
            throw e;
        }
    }
}

Existem outras soluções? Eu prefiro algo baseado em, RejectedExecutionHandlerpois parece uma maneira padrão de lidar com essas situações.

Ponto fixo
fonte
2
Existe uma condição de corrida aqui entre o ponto em que o semáforo é liberado na cláusula finally e o semáforo é adquirido?
Volni
2
Conforme mencionado acima, essa implementação é falha porque o semáforo é liberado antes da conclusão da tarefa. Seria melhor usar o método java.util.concurrent.ThreadPoolExecutor # afterExecute (Runnable, Throwable)
FelixM
2
@FelixM: usar java.util.concurrent.ThreadPoolExecutor # afterExecute (Runnable, Throwable) não resolverá o problema porque afterExecute é chamado imediatamente após task.run () em java.util.concurrent.ThreadPoolExecutor # runWorker (Worker w), antes pegando o próximo elemento da fila (olhando o código-fonte do openjdk 1.7.0.6).
Jaan
1
Esta resposta é do livro Java Concurrency in Practice de Brian Goetz
orangepips
11
Esta resposta não é totalmente correta, também são os comentários. Esse trecho de código realmente vem do Java Concurrency in Practice e está correto se você levar em consideração o contexto . O livro afirma claramente, literalmente: "Em tal abordagem, use uma fila ilimitada (...) e defina o limite no semáforo para ser igual ao tamanho do pool mais o número de tarefas em fila que você deseja permitir". Com uma fila ilimitada, as tarefas nunca serão rejeitadas, portanto, relançar a exceção é totalmente inútil! Essa é, acredito, também a razão pela qual NÃOthrow e; está no livro. JCIP está correto!
Timmos
30

Você pode usar ThreadPoolExecutor e um BlockQueue:

public class ImageManager {
    BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(blockQueueSize);
    RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
    private ExecutorService executorService =  new ThreadPoolExecutor(numOfThread, numOfThread, 
        0L, TimeUnit.MILLISECONDS, blockingQueue, rejectedExecutionHandler);

    private int downloadThumbnail(String fileListPath){
        executorService.submit(new yourRunnable());
    }
}
DiTime4Tea
fonte
Eu só gostaria de dizer que essa foi uma solução incrivelmente rápida e fácil de implementar que funcionou muito bem!
Ivan
58
Isso executa tarefas rejeitadas no tópico de envio. Que funcionalmente não atende aos requisitos do OP.
Percepção de
4
Isso executa a tarefa "no encadeamento de chamada" em vez de bloquear para colocá-la na fila, o que pode ter alguns efeitos adversos, como se vários encadeamentos chamem dessa forma, mais do que trabalhos de "tamanho de fila" estarão em execução, e se Se a tarefa demorar mais do que o esperado, seu encadeamento de "produção" pode não manter o executor ocupado. Mas funcionou muito bem aqui!
rogerdpack de
4
Downvoted: não bloqueia quando o TPE está saturado. Esta é apenas uma alternativa, não uma solução.
Timmos de
1
Votos positivos: isso se encaixa perfeitamente no 'design do TPE' e 'bloqueia naturalmente' os encadeamentos do cliente por dar a eles tarefas excessivas para fazer. Isso deve abranger a maioria dos casos de uso, mas não todos, é claro, e você deve entender o que está acontecendo nos bastidores.
Mike
12

Você deve usar o CallerRunsPolicy, que executa a tarefa rejeitada no thread de chamada. Dessa forma, ele não pode enviar nenhuma tarefa nova ao executor até que a tarefa seja concluída, quando então haverá alguns threads de pool livres ou o processo se repetirá.

http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html

Dos documentos:

Tarefas rejeitadas

Novas tarefas enviadas no método execute (java.lang.Runnable) serão rejeitadas quando o Executor for desligado e também quando o Executor usar limites finitos para threads máximos e capacidade da fila de trabalho e estiver saturado. Em ambos os casos, o método execute chama o método RejectedExecutionHandler.rejectedExecution (java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) de seu RejectedExecutionHandler. Quatro políticas de manipulador predefinidas são fornecidas:

  1. No ThreadPoolExecutor.AbortPolicy padrão, o manipulador lança um RejectedExecutionException de tempo de execução na rejeição.
  2. Em ThreadPoolExecutor.CallerRunsPolicy, o encadeamento que invoca se executa executa a tarefa. Isso fornece um mecanismo de controle de feedback simples que diminuirá a taxa de envio de novas tarefas.
  3. Em ThreadPoolExecutor.DiscardPolicy, uma tarefa que não pode ser executada é simplesmente descartada.
  4. Em ThreadPoolExecutor.DiscardOldestPolicy, se o executor não for encerrado, a tarefa no topo da fila de trabalho será descartada e a execução será repetida (o que pode falhar novamente, fazendo com que isso seja repetido).

Além disso, certifique-se de usar uma fila limitada, como ArrayBlockingQueue, ao chamar o ThreadPoolExecutorconstrutor. Caso contrário, nada será rejeitado.

Editar: em resposta ao seu comentário, defina o tamanho de ArrayBlockingQueue para ser igual ao tamanho máximo do pool de threads e use a AbortPolicy.

Edit 2: Ok, vejo o que você quer chegar. Que tal isso: sobrescrever o beforeExecute()método para verificar getActiveCount()se não excede getMaximumPoolSize()e, se exceder , durma e tente novamente?

danben
fonte
3
Eu quero que o número de tarefas executadas simultaneamente seja estritamente limitado (pelo número de threads no Executor), é por isso que não posso permitir que os threads do chamador executem essas tarefas sozinhos.
Ponto fixo de
1
AbortPolicy faria com que o thread do chamador recebesse uma RejectedExecutionException, embora eu precise apenas bloquear.
Ponto fixo de
2
Estou pedindo bloqueio, não suspensão e votação;)
Ponto de correção
@danben: Você não quer dizer CallerRunsPolicy ?
user359996
7
O problema com CallerRunPolicy é que se você tiver um único produtor de thread, você frequentemente terá threads não sendo usados ​​se uma tarefa de longa execução for rejeitada (porque as outras tarefas no pool de threads serão concluídas enquanto a tarefa de longa execução ainda estiver corrida).
Adam Gent,
6

O Hibernate tem um BlockPolicyque é simples e pode fazer o que você quiser:

Veja: Executors.java

/**
 * A handler for rejected tasks that will have the caller block until
 * space is available.
 */
public static class BlockPolicy implements RejectedExecutionHandler {

    /**
     * Creates a <tt>BlockPolicy</tt>.
     */
    public BlockPolicy() { }

    /**
     * Puts the Runnable to the blocking queue, effectively blocking
     * the delegating thread until space is available.
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        try {
            e.getQueue().put( r );
        }
        catch (InterruptedException e1) {
            log.error( "Work discarded, thread was interrupted while waiting for space to schedule: {}", r );
        }
    }
}
Nate Murray
fonte
4
Pensando bem, é uma péssima ideia. Eu não recomendo que você use. Para boas razões, veja aqui: stackoverflow.com/questions/3446011/…
Nate Murray
Além disso, não está usando a "biblioteca Java padrão", conforme solicitação do OP. Excluir?
user359996
1
Uau, isso é tão feio. Basicamente, esta solução interfere com os internos do TPE. O javadoc para ThreadPoolExecutoreven diz literalmente: "O método getQueue () permite o acesso à fila de trabalho para fins de monitoramento e depuração. O uso desse método para qualquer outro propósito é fortemente desencorajado.". É absolutamente triste ver que isso está disponível em uma biblioteca tão conhecida.
Timmos
1
com.amazonaws.services.simpleworkflow.flow.worker.BlockCallerPolicy é semelhante.
Adrian Baker
6

A BoundedExecutorresposta citada acima do Java Concurrency in Practice só funciona corretamente se você usar uma fila ilimitada para o Executor ou se o limite do semáforo não for maior que o tamanho da fila. O semáforo é o estado compartilhado entre a thread de envio e as threads no pool, tornando possível saturar o executor mesmo se o tamanho da fila <limite <= (tamanho da fila + tamanho do pool).

O uso CallerRunsPolicysó é válido se suas tarefas não forem executadas para sempre, caso em que seu tópico de envio permanecerá para rejectedExecutionsempre, e uma má ideia se suas tarefas demorarem para serem executadas, porque o tópico de envio não pode enviar novas tarefas ou faça qualquer outra coisa se estiver executando uma tarefa em si.

Se isso não for aceitável, sugiro verificar o tamanho da fila limitada do executor antes de enviar uma tarefa. Se a fila estiver cheia, aguarde um pouco antes de tentar enviar novamente. O rendimento será afetado, mas sugiro que seja uma solução mais simples do que muitas das outras soluções propostas e você tem a garantia de que nenhuma tarefa será rejeitada.

stephan f
fonte
Não tenho certeza de como verificar o comprimento da fila antes de enviar garante que não haja tarefas rejeitadas em um ambiente multithread com vários produtores de tarefas. Isso não parece thread-safe.
Tim
5

Eu sei, é um hack, mas na minha opinião o hack mais limpo entre os oferecidos aqui ;-)

Como ThreadPoolExecutor usa a fila de bloqueio "oferta" em vez de "colocar", permite substituir o comportamento de "oferta" da fila de bloqueio:

class BlockingQueueHack<T> extends ArrayBlockingQueue<T> {

    BlockingQueueHack(int size) {
        super(size);
    }

    public boolean offer(T task) {
        try {
            this.put(task);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return true;
    }
}

ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 2, 1, TimeUnit.MINUTES, new BlockingQueueHack(5));

Eu testei e parece funcionar. A implementação de alguma política de tempo limite é deixada como um exercício do leitor.

Jakub
fonte
Consulte stackoverflow.com/a/4522411/2601671 para obter uma versão limpa disso. Eu concordo, é a maneira mais limpa de fazer isso.
Trenton
3

A classe a seguir envolve um ThreadPoolExecutor e usa um Semaphore para bloquear a fila de trabalho está cheia:

public final class BlockingExecutor { 

    private final Executor executor;
    private final Semaphore semaphore;

    public BlockingExecutor(int queueSize, int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, ThreadFactory factory) {
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
        this.executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, queue, factory);
        this.semaphore = new Semaphore(queueSize + maxPoolSize);
    }

    private void execImpl (final Runnable command) throws InterruptedException {
        semaphore.acquire();
        try {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            // will never be thrown with an unbounded buffer (LinkedBlockingQueue)
            semaphore.release();
            throw e;
        }
    }

    public void execute (Runnable command) throws InterruptedException {
        execImpl(command);
    }
}

Essa classe de wrapper é baseada em uma solução fornecida no livro Java Concurrency in Practice de Brian Goetz. A solução no livro leva apenas dois parâmetros do construtor: um Executore um limite usado para o semáforo. Isso é mostrado na resposta dada por Fixpoint. Há um problema com essa abordagem: ela pode chegar a um estado em que os threads do pool estão ocupados, a fila está cheia, mas o semáforo acaba de liberar uma licença. ( semaphore.release()no bloco finalmente). Nesse estado, uma nova tarefa pode obter a licença recém-liberada, mas é rejeitada porque a fila de tarefas está cheia. Claro que isso não é algo que você deseja; você deseja bloquear neste caso.

Para resolver isso, devemos usar uma fila ilimitada , como o JCiP claramente menciona. O semáforo atua como um guarda, dando o efeito de um tamanho de fila virtual. Isso tem o efeito colateral de que é possível que a unidade possa conter maxPoolSize + virtualQueueSize + maxPoolSizetarefas. Por que é que? Por causa do semaphore.release()bloco no último. Se todos os threads do pool chamarem essa instrução ao mesmo tempo, as maxPoolSizepermissões serão liberadas, permitindo que o mesmo número de tarefas entre na unidade. Se estivéssemos usando uma fila limitada, ela ainda estaria cheia, resultando em uma tarefa rejeitada. Agora, como sabemos que isso ocorre apenas quando um thread de pool está quase pronto, isso não é um problema. Sabemos que o encadeamento do pool não será bloqueado, portanto, uma tarefa logo será retirada da fila.

No entanto, você pode usar uma fila limitada. Apenas certifique-se de que seu tamanho seja igual virtualQueueSize + maxPoolSize. Tamanhos maiores são inúteis, o semáforo impedirá a entrada de mais itens. Tamanhos menores resultarão em tarefas rejeitadas. A chance de as tarefas serem rejeitadas aumenta conforme o tamanho diminui. Por exemplo, digamos que você queira um executor limitado com maxPoolSize = 2 e virtualQueueSize = 5. Em seguida, pegue um semáforo com 5 + 2 = 7 licenças e um tamanho de fila real de 5 + 2 = 7. O número real de tarefas que podem estar na unidade é 2 + 5 + 2 = 9. Quando o executor está cheio (5 tarefas na fila, 2 no pool de threads, portanto, 0 licenças disponíveis) e TODAS as threads do pool liberam suas permissões, então exatamente 2 licenças podem ser obtidas pelas tarefas que chegam.

Agora, a solução do JCiP é um tanto complicada de usar, pois não impõe todas essas restrições (fila ilimitada ou limitada por essas restrições matemáticas, etc.). Acho que isso serve apenas como um bom exemplo para demonstrar como você pode construir novas classes thread-safe com base nas partes que já estão disponíveis, mas não como uma classe totalmente desenvolvida e reutilizável. Não creio que seja esta a intenção do autor.

Timmos
fonte
2

você pode usar um RejectedExecutionHandler personalizado como este

ThreadPoolExecutor tp= new ThreadPoolExecutor(core_size, // core size
                max_handlers, // max size 
                timeout_in_seconds, // idle timeout 
                TimeUnit.SECONDS, queue, new RejectedExecutionHandler() {
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        // This will block if the queue is full
                        try {
                            executor.getQueue().put(r);
                        } catch (InterruptedException e) {
                            System.err.println(e.getMessage());
                        }

                    }
                });
Amir David Nissan Cohen
fonte
1
Os documentos para getQueue () mencionam explicitamente que o acesso à fila de tarefas se destina principalmente à depuração e monitoramento.
Chadi
0

Crie sua própria fila de bloqueio para ser usada pelo Executor, com o comportamento de bloqueio que você está procurando, enquanto sempre retorna a capacidade restante disponível (garantindo que o executor não tente criar mais threads do que seu pool principal ou acionar o manipulador de rejeição).

Acredito que isso lhe dará o comportamento de bloqueio que você está procurando. Um manipulador de rejeição nunca caberá a conta, pois isso indica que o executor não pode executar a tarefa. O que eu poderia imaginar é que você obtém alguma forma de 'espera ocupada' no manipulador. Não é isso que você quer, você quer uma fila para o executor que bloqueie o chamador ...

Fried Hoeben
fonte
2
ThreadPoolExecutorusa o offermétodo para adicionar tarefas à fila. Se eu criasse um costume BlockingQueueque bloqueia offer, isso quebraria BlockingQueueo contrato.
Ponto fixo
@Shooshpanchick, isso quebraria o contrato da BlockingQueues. E daí? se você estiver tão interessado, poderá ativar explicitamente o comportamento durante o envio () apenas (será necessário um ThreadLocal)
bestsss
Veja também esta resposta a outra pergunta que explica essa alternativa.
Robert Tupelo-Schneck
há um motivo pelo qual ThreadPoolExecutorfoi implementado para usar offere não put(a versão de bloqueio)? Além disso, se houvesse uma maneira de o código do cliente dizer qual usar quando, muitas pessoas tentando criar soluções personalizadas à mão ficariam aliviadas
asgs
0

Para evitar problemas com a solução @FixPoint. Pode-se usar ListeningExecutorService e liberar o semáforo onSuccess e onFailure dentro de FutureCallback.

vamsu
fonte
Isso tem os mesmos problemas inerentes de envolver apenas o, Runnablepois esses métodos ainda são chamados antes da limpeza do trabalhador no normal ThreadPoolExecutor. Ou seja, você ainda precisará lidar com as exceções de rejeição.
Adam Gent,
0

Recentemente descobri que esta questão tinha o mesmo problema. O OP não diz isso explicitamente, mas não queremos usar o RejectedExecutionHandlerque executa uma tarefa no thread do remetente, porque isso subutilizará os threads de trabalho se essa tarefa for longa.

Lendo todas as respostas e comentários, em particular a solução falha com o semáforo ou usando afterExecutedei uma olhada mais de perto no código do ThreadPoolExecutor para ver se existe alguma saída. Fiquei surpreso ao ver que existem mais de 2.000 linhas de código (comentadas), algumas das quais me deixam tonto . Dado o requisito bastante simples que realmente tenho - um produtor, vários consumidores, deixe o produtor bloquear quando nenhum consumidor puder trabalhar - decidi lançar minha própria solução. Não é um, ExecutorServicemas apenas um Executor. E não adapta o número de threads à carga de trabalho, mas mantém apenas um número fixo de threads, o que também atende aos meus requisitos. Aqui está o código. Sinta-se à vontade para reclamar :-)

package x;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;

/**
 * distributes {@code Runnable}s to a fixed number of threads. To keep the
 * code lean, this is not an {@code ExecutorService}. In particular there is
 * only very simple support to shut this executor down.
 */
public class ParallelExecutor implements Executor {
  // other bounded queues work as well and are useful to buffer peak loads
  private final BlockingQueue<Runnable> workQueue =
      new SynchronousQueue<Runnable>();
  private final Thread[] threads;

  /*+**********************************************************************/
  /**
   * creates the requested number of threads and starts them to wait for
   * incoming work
   */
  public ParallelExecutor(int numThreads) {
    this.threads = new Thread[numThreads];
    for(int i=0; i<numThreads; i++) {
      // could reuse the same Runner all over, but keep it simple
      Thread t = new Thread(new Runner());
      this.threads[i] = t;
      t.start();
    }
  }
  /*+**********************************************************************/
  /**
   * returns immediately without waiting for the task to be finished, but may
   * block if all worker threads are busy.
   * 
   * @throws RejectedExecutionException if we got interrupted while waiting
   *         for a free worker
   */
  @Override
  public void execute(Runnable task)  {
    try {
      workQueue.put(task);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new RejectedExecutionException("interrupt while waiting for a free "
          + "worker.", e);
    }
  }
  /*+**********************************************************************/
  /**
   * Interrupts all workers and joins them. Tasks susceptible to an interrupt
   * will preempt their work. Blocks until the last thread surrendered.
   */
  public void interruptAndJoinAll() throws InterruptedException {
    for(Thread t : threads) {
      t.interrupt();
    }
    for(Thread t : threads) {
      t.join();
    }
  }
  /*+**********************************************************************/
  private final class Runner implements Runnable {
    @Override
    public void run() {
      while (!Thread.currentThread().isInterrupted()) {
        Runnable task;
        try {
          task = workQueue.take();
        } catch (InterruptedException e) {
          // canonical handling despite exiting right away
          Thread.currentThread().interrupt(); 
          return;
        }
        try {
          task.run();
        } catch (RuntimeException e) {
          // production code to use a logging framework
          e.printStackTrace();
        }
      }
    }
  }
}
Harald
fonte
0

Acredito que haja uma maneira bastante elegante de resolver esse problema usando java.util.concurrent.Semaphoree delegando o comportamento de Executor.newFixedThreadPool. O novo serviço executor só executará uma nova tarefa quando houver um thread para fazê-lo. O bloqueio é gerenciado pelo Semaphore com número de permissões igual ao número de threads. Quando uma tarefa é concluída, ele retorna uma licença.

public class FixedThreadBlockingExecutorService extends AbstractExecutorService {

private final ExecutorService executor;
private final Semaphore blockExecution;

public FixedThreadBlockingExecutorService(int nTreads) {
    this.executor = Executors.newFixedThreadPool(nTreads);
    blockExecution = new Semaphore(nTreads);
}

@Override
public void shutdown() {
    executor.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
    return executor.shutdownNow();
}

@Override
public boolean isShutdown() {
    return executor.isShutdown();
}

@Override
public boolean isTerminated() {
    return executor.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
    return executor.awaitTermination(timeout, unit);
}

@Override
public void execute(Runnable command) {
    blockExecution.acquireUninterruptibly();
    executor.execute(() -> {
        try {
            command.run();
        } finally {
            blockExecution.release();
        }
    });
}
Radoslaw Kachel
fonte
Implementei o BoundedExecutor descrito em Java Concurrency in Practice e descobri que o Semaphore deve ser inicializado com o sinalizador de imparcialidade definido como true para garantir que as permissões do Semaphore sejam oferecidas nos pedidos feitos. Consulte docs.oracle.com/javase/7/docs/api/java/util/concurrent/… . para detalhes
Prahalad Deshpande
0

Eu tive a mesma necessidade no passado: uma espécie de fila de bloqueio com um tamanho fixo para cada cliente apoiado por um pool de thread compartilhado. Acabei escrevendo meu próprio tipo de ThreadPoolExecutor:

UserThreadPoolExecutor (fila de bloqueio (por cliente) + threadpool (compartilhado entre todos os clientes))

Veja: https://github.com/d4rxh4wx/UserThreadPoolExecutor

Cada UserThreadPoolExecutor recebe um número máximo de threads de um ThreadPoolExecutor compartilhado

Cada UserThreadPoolExecutor pode:

  • envie uma tarefa para o executor do pool de threads compartilhados se sua cota não for atingida. Se sua cota for atingida, o trabalho é enfileirado (bloqueio não consumptivo aguardando CPU). Assim que uma de suas tarefas enviadas for concluída, a cota é diminuída, permitindo que outra tarefa aguarde para ser enviada ao ThreadPoolExecutor
  • espere que as tarefas restantes sejam concluídas
d4rxh4wx
fonte
0

Encontrei essa política de rejeição no cliente de pesquisa elástica. Ele bloqueia o thread do chamador na fila de bloqueio. Código abaixo-

 static class ForceQueuePolicy implements XRejectedExecutionHandler 
 {
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) 
        {
            try 
            {
                executor.getQueue().put(r);
            } 
            catch (InterruptedException e) 
            {
                //should never happen since we never wait
                throw new EsRejectedExecutionException(e);
            }
        }

        @Override
        public long rejected() 
        {
            return 0;
        }
}
Vladislav
fonte
0

Recentemente, tive a necessidade de alcançar algo semelhante, mas em a ScheduledExecutorService.

Eu também tive que garantir que lidei com o atraso que está sendo passado no método e que a tarefa seja enviada para ser executada no momento em que o chamador espera ou apenas falhe, lançando um RejectedExecutionException.

Outros métodos ScheduledThreadPoolExecutorpara executar ou enviar uma tarefa chamam internamente #scheduleque, por sua vez, ainda invocará os métodos substituídos.

import java.util.concurrent.*;

public class BlockingScheduler extends ScheduledThreadPoolExecutor {
    private final Semaphore maxQueueSize;

    public BlockingScheduler(int corePoolSize,
                             ThreadFactory threadFactory,
                             int maxQueueSize) {
        super(corePoolSize, threadFactory, new AbortPolicy());
        this.maxQueueSize = new Semaphore(maxQueueSize);
    }

    @Override
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(command, unit.toMillis(delay));
        return super.schedule(command, newDelayInMs, TimeUnit.MILLISECONDS);
    }

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(callable, unit.toMillis(delay));
        return super.schedule(callable, newDelayInMs, TimeUnit.MILLISECONDS);
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(command, unit.toMillis(initialDelay));
        return super.scheduleAtFixedRate(command, newDelayInMs, unit.toMillis(period), TimeUnit.MILLISECONDS);
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long period,
                                                     TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(command, unit.toMillis(initialDelay));
        return super.scheduleWithFixedDelay(command, newDelayInMs, unit.toMillis(period), TimeUnit.MILLISECONDS);
    }

    @Override
    protected void afterExecute(Runnable runnable, Throwable t) {
        super.afterExecute(runnable, t);
        try {
            if (t == null && runnable instanceof Future<?>) {
                try {
                    ((Future<?>) runnable).get();
                } catch (CancellationException | ExecutionException e) {
                    t = e;
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt(); // ignore/reset
                }
            }
            if (t != null) {
                System.err.println(t);
            }
        } finally {
            releaseQueueUsage();
        }
    }

    private long beforeSchedule(Runnable runnable, long delay) {
        try {
            return getQueuePermitAndModifiedDelay(delay);
        } catch (InterruptedException e) {
            getRejectedExecutionHandler().rejectedExecution(runnable, this);
            return 0;
        }
    }

    private long beforeSchedule(Callable callable, long delay) {
        try {
            return getQueuePermitAndModifiedDelay(delay);
        } catch (InterruptedException e) {
            getRejectedExecutionHandler().rejectedExecution(new FutureTask(callable), this);
            return 0;
        }
    }

    private long getQueuePermitAndModifiedDelay(long delay) throws InterruptedException {
        final long beforeAcquireTimeStamp = System.currentTimeMillis();
        maxQueueSize.tryAcquire(delay, TimeUnit.MILLISECONDS);
        final long afterAcquireTimeStamp = System.currentTimeMillis();
        return afterAcquireTimeStamp - beforeAcquireTimeStamp;
    }

    private void releaseQueueUsage() {
        maxQueueSize.release();
    }
}

Eu tenho o código aqui, agradecemos qualquer feedback. https://github.com/AmitabhAwasthi/BlockingScheduler

Dev Amitabh
fonte
Essa resposta depende totalmente do conteúdo de links externos. Se algum dia eles se tornarem inválidos, sua resposta será inútil. Portanto, edite sua resposta e adicione pelo menos um resumo do que pode ser encontrado lá. Obrigado!
Fabio diz Restabelecer Monica,
@fabio: obrigado por apontar. Eu adicionei o código lá para que agora faça mais sentido para os leitores. Agradeço seu comentário :)
Dev Amitabh
0

Nem sempre gosto da CallerRunsPolicy, especialmente porque ela permite que a tarefa rejeitada 'pule a fila' e seja executada antes das tarefas enviadas anteriormente. Além disso, executar a tarefa no encadeamento de chamada pode demorar muito mais do que esperar que o primeiro slot fique disponível.

Resolvi esse problema usando um RejectedExecutionHandler personalizado, que simplesmente bloqueia o thread de chamada por um tempo e depois tenta enviar a tarefa novamente:

public class BlockWhenQueueFull implements RejectedExecutionHandler {

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

        // The pool is full. Wait, then try again.
        try {
            long waitMs = 250;
            Thread.sleep(waitMs);
        } catch (InterruptedException interruptedException) {}

        executor.execute(r);
    }
}

Esta classe só pode ser usada no executor do pool de threads como um RejectedExecutinHandler como qualquer outra, por exemplo:

executorPool = new ThreadPoolExecutor(1, 1, 10,
                                      TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
                                      new BlockWhenQueueFull());

A única desvantagem que vejo é que o thread de chamada pode ficar bloqueado um pouco mais do que o estritamente necessário (até 250 ms). Além disso, como esse executor está efetivamente sendo chamado recursivamente, esperas muito longas para que um thread se torne disponível (horas) pode resultar em um estouro de pilha.

No entanto, eu pessoalmente gosto desse método. É compacto, fácil de entender e funciona bem.

TinkerTank
fonte
1
Como você mesmo diz: isso pode criar um stackoverflow. Não é algo que eu gostaria de ter no código de produção.
Harald
Todo mundo deve tomar suas próprias decisões. Para minha carga de trabalho, isso não é um problema. As tarefas são executadas em segundos, em vez das horas que seriam necessárias para explodir. Além disso, o mesmo pode ser dito para virtualmente qualquer algoritmo recursivo. Essa é uma razão para nunca usar qualquer algoritmo recursivo na produção?
TinkerTank