Impossível criar um pool de encadeamentos em cache com um limite de tamanho?

127

Parece ser impossível criar um pool de threads em cache com um limite para o número de threads que ele pode criar.

Aqui está como Executors.newCachedThreadPool estático é implementado na biblioteca Java padrão:

 public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

Portanto, usando esse modelo para criar um pool de encadeamentos em cache de tamanho fixo:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronusQueue<Runable>());

Agora, se você usar isso e enviar três tarefas, tudo ficará bem. O envio de outras tarefas resultará em exceções de execução rejeitadas.

Tentando isso:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runable>());

Resultará na execução seqüencial de todos os threads. Ou seja, o pool de threads nunca criará mais de um thread para lidar com suas tarefas.

Este é um erro no método de execução do ThreadPoolExecutor? Ou talvez isso seja intencional? Ou existe alguma outra maneira?

Editar: eu quero algo exatamente como o pool de threads em cache (ele cria threads sob demanda e os mata após algum tempo limite), mas com um limite no número de threads que ele pode criar e a capacidade de continuar na fila de tarefas adicionais depois atingiu seu limite de threads. De acordo com a resposta de sjlee, isso é impossível. Olhando para o método execute () do ThreadPoolExecutor, é realmente impossível. Eu precisaria subclassificar ThreadPoolExecutor e substituir execute () da mesma forma que o SwingWorker, mas o que o SwingWorker faz em seu execute () é um hack completo.

Matt Crinklaw-Vogt
fonte
1
Qual a sua pergunta? O seu segundo exemplo de código não é a resposta para o seu título?
Rsp
4
Quero um pool de threads que adicione threads sob demanda à medida que o número de tarefas aumenta, mas nunca adicionará mais do que um número máximo de threads. CachedThreadPool já faz isso, exceto que adicionará um número ilimitado de threads e não será interrompido em um tamanho predefinido. O tamanho que eu defino nos exemplos é 3. O segundo exemplo adiciona 1 encadeamento, mas não adiciona mais dois quando novas tarefas chegam enquanto as outras tarefas ainda não foram concluídas.
Matt Crinklaw-Vogt 26/11/2009
Verifique isso, resolve-o, debuggingisfun.blogspot.com/2012/05/…
ethan

Respostas:

235

O ThreadPoolExecutor possui os seguintes comportamentos principais, e seus problemas podem ser explicados por esses comportamentos.

Quando as tarefas são enviadas,

  1. Se o conjunto de encadeamentos não atingiu o tamanho do núcleo, ele cria novos encadeamentos.
  2. Se o tamanho do núcleo tiver sido atingido e não houver encadeamentos inativos, ele enfileirará tarefas.
  3. Se o tamanho do núcleo foi atingido, não há threads inativos e a fila fica cheia, ele cria novos threads (até atingir o tamanho máximo).
  4. Se o tamanho máximo foi atingido, não há threads ociosos e a fila fica cheia, a política de rejeição entra em ação.

No primeiro exemplo, observe que o SynchronousQueue possui essencialmente tamanho 0. Portanto, no momento em que você atinge o tamanho máximo (3), a política de rejeição entra em ação (nº 4).

No segundo exemplo, a fila de escolha é um LinkedBlockingQueue que possui um tamanho ilimitado. Portanto, você fica preso ao comportamento # 2.

Você não pode realmente mexer muito com o tipo em cache ou o tipo fixo, pois o comportamento deles é quase completamente determinado.

Se você deseja ter um conjunto de encadeamentos dinâmico e limitado, é necessário usar um tamanho de núcleo positivo e um tamanho máximo combinados com uma fila de tamanho finito. Por exemplo,

new ThreadPoolExecutor(10, // core size
    50, // max size
    10*60, // idle timeout
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(20)); // queue with a size

Adendo : esta é uma resposta bastante antiga e parece que o JDK mudou seu comportamento quando se trata de um tamanho de núcleo igual a 0. Desde o JDK 1.6, se o tamanho do núcleo for 0 e o pool não tiver nenhum thread, o ThreadPoolExecutor adicionará um thread para executar essa tarefa. Portanto, o tamanho do núcleo de 0 é uma exceção à regra acima. Obrigado Steve por trazer isso à minha atenção.

sjlee
fonte
4
Você deve escrever algumas palavras sobre o método allowCoreThreadTimeOutpara tornar essa resposta perfeita. Veja a resposta de @ user1046052
hsestupin
1
Ótima resposta! Apenas um ponto a acrescentar: Outras políticas de rejeição também merecem destaque. Veja a resposta de @brianegge
Jeff
1
O comportamento 2 não deveria dizer 'Se o tamanho maxThread foi atingido e não há threads ociosos, ele enfileira tarefas.' ?
Zoltán
1
Você poderia elaborar o que o tamanho da fila implica? Isso significa que apenas 20 tarefas podem ser enfileiradas antes de serem rejeitadas?
Zoltán
1
@ Zoltán Escrevi isso há um tempo atrás, então há uma chance de que algum comportamento possa ter mudado desde então (não acompanhei as atividades mais recentes), mas, supondo que esse comportamento seja inalterado, o número 2 está correto como indicado, e é isso talvez o ponto mais importante (e um tanto surpreendente) disso. Depois que o tamanho do núcleo é atingido, o TPE favorece a fila ao invés de criar novos threads. O tamanho da fila é literalmente o tamanho da fila que é passada para o TPE. Se a fila ficar cheia, mas não atingir o tamanho máximo, ela criará um novo encadeamento (não rejeitará tarefas). Veja # 3. Espero que ajude.
Sjlee #
60

A menos que eu tenha perdido algo, a solução para a pergunta original é simples. O código a seguir implementa o comportamento desejado, conforme descrito no pôster original. Gerará até 5 threads para trabalhar em uma fila ilimitada e os threads inativos serão encerrados após 60 segundos.

tp = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>());
tp.allowCoreThreadTimeOut(true);

fonte
1
Você está certo. Esse método foi adicionado no jdk 1.6, para que muitas pessoas não saibam disso. Além disso, você não pode ter um tamanho mínimo de pool principal, o que é lamentável.
jtahlborn
4
Minha única preocupação com isso é (dos documentos do JDK 8): "Quando uma nova tarefa é enviada no método execute (Runnable) e menos do que os threads corePoolSize estão em execução, um novo thread é criado para lidar com a solicitação, mesmo se outro trabalhador os threads estão ociosos ".
Veegee
Tenho certeza de que isso realmente não funciona. Na última vez em que fiz isso, na verdade, apenas executamos seu trabalho em um thread, mesmo que você tenha 5. Mais uma vez, alguns anos depois, mas quando eu mergulhei na implementação do ThreadPoolExecutor, ele só despachou para novos threads quando a fila estava cheia. O uso de uma fila ilimitada faz com que isso nunca aconteça. Você pode testar enviando trabalho e registrando o nome do encadeamento e, em seguida, dormindo. Todo executável acabará imprimindo o mesmo nome / não será executado em nenhum outro encadeamento.
Matt Crinklaw-Vogt
2
Isso funciona, Matt. Você define o tamanho do núcleo como 0, é por isso que você tinha apenas 1 thread. O truque aqui é definir o tamanho do núcleo para o tamanho máximo.
T-Gergely
1
@vegee está certo - Na verdade, isso não funciona muito bem - o ThreadPoolExecutor só reutilizará os threads quando estiver acima do corePoolSize. Portanto, quando corePoolSize for igual a maxPoolSize, você somente se beneficiará do cache do encadeamento quando o pool estiver cheio (portanto, se você pretende usá-lo, mas geralmente permanece abaixo do tamanho máximo do pool, é melhor reduzir o tempo limite do encadeamento para um valor baixo. valor, e estar ciente de que não há cache - sempre novas threads)
Chris Riddell
7

Teve o mesmo problema. Como nenhuma outra resposta reúne todas as questões, estou adicionando a minha:

Agora está claramente escrito nos documentos : Se você usar uma fila que não bloqueia ( LinkedBlockingQueue) a configuração de threads máximos não terá efeito, apenas os threads principais serão usados.

tão:

public class MyExecutor extends ThreadPoolExecutor {

    public MyExecutor() {
        super(4, 4, 5,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        allowCoreThreadTimeOut(true);
    }

    public void setThreads(int n){
        setMaximumPoolSize(Math.max(1, n));
        setCorePoolSize(Math.max(1, n));
    }

}

Este executor possui:

  1. Nenhum conceito de threads máximos, pois estamos usando uma fila ilimitada. Isso é bom porque essa fila pode fazer com que o executor crie um grande número de threads extras que não são essenciais, se seguir sua política usual.

  2. Uma fila de tamanho máximo Integer.MAX_VALUE. Submit()será lançado RejectedExecutionExceptionse o número de tarefas pendentes exceder Integer.MAX_VALUE. Não tenho certeza se ficaremos sem memória primeiro ou isso acontecerá.

  3. Possui 4 threads principais possíveis. Os encadeamentos ociosos do núcleo saem automaticamente se ociosos por 5 segundos. Então, sim, encadeamentos estritamente sob demanda. O número pode ser variado usando o setThreads()método

  4. Garante que o número mínimo de threads do núcleo nunca seja menor que um, ou então submit()rejeitará todas as tarefas. Como os threads principais precisam ser> = max threads, o método também setThreads()define o máximo de threads, embora a configuração do max thread seja inútil para uma fila ilimitada.

SD
fonte
Eu acho que você também precisa definir 'allowCoreThreadTimeOut' como 'true', caso contrário, uma vez que os threads sejam criados, você os manterá por toda a eternidade: gist.github.com/ericdcobb/46b817b384f5ca9d5f5d
eric
Opa, eu apenas perdi isso, desculpe, sua resposta é perfeita então!
22614 eric
6

No seu primeiro exemplo, as tarefas subseqüentes são rejeitadas porque AbortPolicyé o padrão RejectedExecutionHandler. O ThreadPoolExecutor contém as seguintes políticas, que você pode alterar através do setRejectedExecutionHandlermétodo:

CallerRunsPolicy
AbortPolicy
DiscardPolicy
DiscardOldestPolicy

Parece que você deseja um pool de threads em cache com um CallerRunsPolicy.

brianegge
fonte
5

Nenhuma das respostas aqui resolveu meu problema, que tinha a ver com a criação de uma quantidade limitada de conexões HTTP usando o cliente HTTP do Apache (versão 3.x). Como levei algumas horas para descobrir uma boa configuração, vou compartilhar:

private ExecutorService executor = new ThreadPoolExecutor(5, 10, 60L,
  TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
  Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

Isso cria um ThreadPoolExecutorque começa com cinco e mantém um máximo de dez threads em execução simultânea usando CallerRunsPolicypara execução.

Paulo
fonte
O problema com esta solução é que, se você aumentar o número ou produtores, aumentará o número de threads executando os threads em segundo plano. Em muitos casos, não é isso que você deseja.
Grey
3

De acordo com o Javadoc para ThreadPoolExecutor:

Se houver mais que threads corePoolSize, mas menos que maximumPoolSize em execução, um novo thread será criado apenas se a fila estiver cheia . Ao definir corePoolSize e maximumPoolSize da mesma forma, você cria um conjunto de encadeamentos de tamanho fixo.

(Ênfase minha.)

A resposta do jitter é o que você deseja, embora a minha responda à sua outra pergunta. :)

Jonathan Feinberg
fonte
2

existe mais uma opção. Em vez de usar o novo SynchronousQueue, você também pode usar qualquer outra fila, mas é necessário garantir que o tamanho seja 1, para forçar o serviço do executor a criar um novo encadeamento.

Ashkrit
fonte
Eu acho que você quer dizer o tamanho 0 (por padrão), para que não haja tarefas na fila e realmente force o serviço do executor a criar sempre um novo thread.
Leonmax
2

Não parece que nenhuma das respostas realmente responda à pergunta - na verdade, não vejo uma maneira de fazer isso - mesmo se você subclasse de PooledExecutorService, pois muitos dos métodos / propriedades são privados, por exemplo, tornando protegido o addIfUnderMaximumPoolSize, você poderia faça o seguinte:

class MyThreadPoolService extends ThreadPoolService {
    public void execute(Runnable run) {
        if (poolSize() == 0) {
            if (addIfUnderMaximumPoolSize(run) != null)
                return;
        }
        super.execute(run);
    }
}

O mais próximo que cheguei foi disso - mas mesmo isso não é uma solução muito boa

new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) {
    public void execute(Runnable command) {
        if (getPoolSize() == 0 && getActiveCount() < getMaximumPoolSize()) {        
            super.setCorePoolSize(super.getCorePoolSize() + 1);
        }
        super.execute(command);
    }

    protected void afterExecute(Runnable r, Throwable t) {
         // nothing in the queue
         if (getQueue().isEmpty() && getPoolSize() > min) {
             setCorePoolSize(getCorePoolSize() - 1);
         }
    };
 };

ps não testou o acima

Stuart
fonte
2

Aqui está outra solução. Eu acho que essa solução se comporta como você deseja (embora não tenha orgulho dessa solução):

final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    public boolean offer(Runnable o) {
        if (size() > 1)
            return false;
        return super.offer(o);
    };

    public boolean add(Runnable o) {
        if (super.offer(o))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
};

RejectedExecutionHandler handler = new RejectedExecutionHandler() {         
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        queue.add(r);
    }
};

dbThreadExecutor =
        new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, queue, handler);
Stuart
fonte
2

É isso que você quer (pelo menos eu acho). Para uma explicação, verifique a resposta de Jonathan Feinberg

Executors.newFixedThreadPool(int n)

Cria um conjunto de encadeamentos que reutiliza um número fixo de encadeamentos que operam em uma fila ilimitada compartilhada. A qualquer momento, no máximo os nThreads threads serão tarefas de processamento ativas. Se tarefas adicionais forem enviadas quando todos os encadeamentos estiverem ativos, eles aguardarão na fila até que um encadeamento esteja disponível. Se algum encadeamento terminar devido a uma falha durante a execução anterior ao encerramento, um novo substituirá, se necessário, para executar tarefas subseqüentes. Os encadeamentos no pool existirão até que seja explicitamente encerrado.

nervosismo
fonte
4
Claro, eu poderia usar um pool de threads fixo, mas isso deixaria n threads por toda a eternidade, ou até eu chamar de desligamento. Eu quero algo exatamente como o pool de threads em cache (ele cria threads sob demanda e os mata após um tempo limite), mas com um limite no número de threads que ele pode criar.
Matt Crinklaw-Vogt 26/11/2009
0
  1. Você pode usar ThreadPoolExecutorcomo sugerido por @sjlee

    Você pode controlar o tamanho do pool dinamicamente. Dê uma olhada nesta pergunta para obter mais detalhes:

    Pool de segmentos dinâmicos

    OU

  2. Você pode usar a API newWorkStealingPool , que foi introduzida com o java 8.

    public static ExecutorService newWorkStealingPool()

    Cria um pool de threads que rouba trabalho, usando todos os processadores disponíveis como seu nível de paralelismo de destino.

Por padrão, o nível de paralelismo é definido como o número de núcleos da CPU em seu servidor. Se você tiver um servidor de CPU com 4 núcleos, o tamanho do conjunto de encadeamentos será 4. Essa API retorna o ForkJoinPooltipo de ExecutorService e permite o trabalho de roubar encadeamentos ociosos roubando tarefas de encadeamentos ocupados no ForkJoinPool.

Ravindra babu
fonte
0

O problema foi resumido da seguinte maneira:

Eu quero algo exatamente como o pool de encadeamentos em cache (ele cria encadeamentos sob demanda e depois os mata após algum tempo limite), mas com um limite no número de encadeamentos que ele pode criar e a capacidade de continuar a enfileirar tarefas adicionais quando atingir seu limite de rosca.

Antes de apontar para a solução, explicarei por que as seguintes soluções não funcionam:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());

Isso não enfileirará nenhuma tarefa quando o limite de 3 for atingido porque SynchronousQueue, por definição, não pode conter nenhum elemento.

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

Isso não criará mais de um thread único porque o ThreadPoolExecutor cria apenas threads que excedem o corePoolSize se a fila estiver cheia. Mas o LinkedBlockingQueue nunca está cheio.

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS,
    new LinkedBlockingQueue<Runnable>());
executor.allowCoreThreadTimeOut(true);

Isso não reutilizará os threads até que o corePoolSize seja atingido porque ThreadPoolExecutor aumenta o número de threads até que o corePoolSize seja atingido, mesmo que os threads existentes estejam ociosos. Se você pode viver com essa desvantagem, esta é a solução mais fácil para o problema. É também a solução descrita em "Concorrência Java na Prática" (nota de rodapé na p172).

A única solução completa para o problema descrito parece ser a que envolve substituir o offermétodo da fila e escrever um RejectedExecutionHandlercomo explicado nas respostas a esta pergunta: Como fazer com que o ThreadPoolExecutor aumente o número máximo de threads antes de enfileirar?

Stefan Feuerhahn
fonte
0

Isso funciona para Java8 + (e outros, por enquanto ..)

     Executor executor = new ThreadPoolExecutor(3, 3, 5, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>()){{allowCoreThreadTimeOut(true);}};

onde 3 é o limite da contagem de threads e 5 é o tempo limite para threads inativos.

Se você deseja verificar se funciona , aqui está o código para fazer o trabalho:

public static void main(String[] args) throws InterruptedException {
    final int DESIRED_NUMBER_OF_THREADS=3; // limit of number of Threads for the task at a time
    final int DESIRED_THREAD_IDLE_DEATH_TIMEOUT=5; //any idle Thread ends if it remains idle for X seconds

    System.out.println( java.lang.Thread.activeCount() + " threads");
    Executor executor = new ThreadPoolExecutor(DESIRED_NUMBER_OF_THREADS, DESIRED_NUMBER_OF_THREADS, DESIRED_THREAD_IDLE_DEATH_TIMEOUT, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>()) {{allowCoreThreadTimeOut(true);}};

    System.out.println(java.lang.Thread.activeCount() + " threads");

    for (int i = 0; i < 5; i++) {
        final int fi = i;
        executor.execute(() -> waitsout("starting hard thread computation " + fi, "hard thread computation done " + fi,2000));
    }
    System.out.println("If this is UP, it works");

    while (true) {
        System.out.println(
                java.lang.Thread.activeCount() + " threads");
        Thread.sleep(700);
    }

}

static void waitsout(String pre, String post, int timeout) {
    try {
        System.out.println(pre);
        Thread.sleep(timeout);
        System.out.println(post);
    } catch (Exception e) {
    }
}

saída do código acima para mim é

1 threads
1 threads
If this is UP, it works
starting hard thread computation 0
4 threads
starting hard thread computation 2
starting hard thread computation 1
4 threads
4 threads
hard thread computation done 2
hard thread computation done 0
hard thread computation done 1
starting hard thread computation 3
starting hard thread computation 4
4 threads
4 threads
4 threads
hard thread computation done 3
hard thread computation done 4
4 threads
4 threads
4 threads
4 threads
3 threads
3 threads
3 threads
1 threads
1 threads
Ondřej
fonte