Existe um ExecutorService que usa o segmento atual?

94

O que estou procurando é uma forma compatível de configurar o uso de um pool de threads ou não. Idealmente, o resto do código não deve ser afetado. Eu poderia usar um pool de threads com 1 thread, mas não é bem isso que eu quero. Alguma ideia?

ExecutorService es = threads == 0 ? new CurrentThreadExecutor() : Executors.newThreadPoolExecutor(threads);

// es.execute / es.submit / new ExecutorCompletionService(es) etc
Michael Rutherfurd
fonte

Respostas:

69

Aqui está uma implementação realmente simples Executor(não ExecutorService, lembre-se) que usa apenas o thread atual. Roubando isso de "Java Concurrency in Practice" (leitura essencial).

public class CurrentThreadExecutor implements Executor {
    public void execute(Runnable r) {
        r.run();
    }
}

ExecutorService é uma interface mais elaborada, mas poderia ser tratada com a mesma abordagem.

pensar demais
fonte
4
+1: Como você disse, um ExecutorService poderia ser tratado da mesma maneira, talvez criando uma subclasse de AbstractExecutorService.
Paul Cager
@Paul Sim, AbstractExecutorServiceparece o caminho a seguir.
pense em
14
Em Java8, você pode reduzir isso para apenasRunnable::run
Jon Freedman,
@Juude sempre será executado no thread que chama o executor.
Gustav Karlsson
Não é o objetivo de um executor do mesmo thread, ser capaz de agendar mais tarefas de dentro de execute ()? Esta resposta não serve. Não consigo encontrar uma resposta que satisfaça isso.
haelix
82

Você pode usar Goiaba MoreExecutors.newDirectExecutorService(), ou MoreExecutors.directExecutor()se você não precisa de um ExecutorService.

Se incluir Goiaba for muito pesado, você pode implementar algo quase tão bom:

public final class SameThreadExecutorService extends ThreadPoolExecutor {
  private final CountDownLatch signal = new CountDownLatch(1);

  private SameThreadExecutorService() {
    super(1, 1, 0, TimeUnit.DAYS, new SynchronousQueue<Runnable>(),
        new ThreadPoolExecutor.CallerRunsPolicy());
  }

  @Override public void shutdown() {
    super.shutdown();
    signal.countDown();
  }

  public static ExecutorService getInstance() {
    return SingletonHolder.instance;
  }

  private static class SingletonHolder {
    static ExecutorService instance = createInstance();    
  }

  private static ExecutorService createInstance() {
    final SameThreadExecutorService instance
        = new SameThreadExecutorService();

    // The executor has one worker thread. Give it a Runnable that waits
    // until the executor service is shut down.
    // All other submitted tasks will use the RejectedExecutionHandler
    // which runs tasks using the  caller's thread.
    instance.submit(new Runnable() {
        @Override public void run() {
          boolean interrupted = false;
          try {
            while (true) {
              try {
                instance.signal.await();
                break;
              } catch (InterruptedException e) {
                interrupted = true;
              }
            }
          } finally {
            if (interrupted) {
              Thread.currentThread().interrupt();
            }
          }
        }});
    return Executors.unconfigurableScheduledExecutorService(instance);
  }
}
NamshubWriter
fonte
1
Para Android, é return Executors.unconfigurableExecutorService (instance);
Maragues
se tudo o que usamos é o thread atual , por que primitivas de sincronização? por que a trava?
haelix
@haelix a trava é necessária porque mesmo que o trabalho seja feito no mesmo thread que o adicionou, qualquer thread pode desligar o executor.
NamshubWriter
64

Estilo Java 8:

Executor e = Runnable::run;

lpandzic
fonte
7
Absolutamente imundo. Eu amo isso.
Rogue
O que há de sujo nisso? É elegante :)
lpandzic
É o melhor tipo de sujo @Ipandzic, é incomum e sucinto.
Rogue
12

Escrevi um ExecutorServicebaseado no AbstractExecutorService.

/**
 * Executes all submitted tasks directly in the same thread as the caller.
 */
public class SameThreadExecutorService extends AbstractExecutorService {

    //volatile because can be viewed by other threads
    private volatile boolean terminated;

    @Override
    public void shutdown() {
        terminated = true;
    }

    @Override
    public boolean isShutdown() {
        return terminated;
    }

    @Override
    public boolean isTerminated() {
        return terminated;
    }

    @Override
    public boolean awaitTermination(long theTimeout, TimeUnit theUnit) throws InterruptedException {
        shutdown(); // TODO ok to call shutdown? what if the client never called shutdown???
        return terminated;
    }

    @Override
    public List<Runnable> shutdownNow() {
        return Collections.emptyList();
    }

    @Override
    public void execute(Runnable theCommand) {
        theCommand.run();
    }
}
Eric Obermühlner
fonte
campo terminado não é protegido com sincronizado.
Daneel S. Yaitskov,
1
O terminatedcampo @ DaneelS.Yaitskov não se beneficiará do acesso sincronizado com base no código que está realmente aqui. As operações em campos de 32 bits são atômicas em Java.
Christopher Schultz
Suponho que o método isTerminated () acima não esteja certo porque isTerminated () só deve retornar verdadeiro se não houver tarefas em execução no momento. O Guava rastreia o número de tarefas em outra variável, provavelmente por isso que protegem ambas as variáveis ​​com um bloqueio.
Jeremy K
6

Você pode usar o RejectedExecutionHandler para executar a tarefa no segmento atual.

public static final ThreadPoolExecutor CURRENT_THREAD_EXECUTOR = new ThreadPoolExecutor(0, 0, 0, TimeUnit.DAYS, new SynchronousQueue<Runnable>(), new RejectedExecutionHandler() {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        r.run();
    }
});

Você só precisa de um desses.

Peter Lawrey
fonte
Inteligente! Quão seguro é isso (pergunta honesta)? Existe alguma maneira de uma tarefa ser rejeitada onde você realmente não gostaria de executá-la no thread atual? As tarefas são rejeitadas se o ExecutorService estiver sendo encerrado ou encerrado?
pense em
Como o tamanho máximo é 0, todas as tarefas são rejeitadas. No entanto, o comportamento rejeitado é executar no segmento atual. Só haveria problema se a tarefa NÃO fosse rejeitada.
Peter Lawrey
8
note que já existe uma implementação desta política, não há necessidade de definir a sua própria java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy.
jtahlborn
7
Não é mais possível criar um ThreadPoolExecutor com um tamanho máximo de pool de 0. Acho que seria possível reproduzir o comportamento usando um blockingQueue de tamanho 0, mas nenhuma implementação padrão parece permitir isso.
Axelle Ziegler,
que não compilará devido a {code} if (corePoolSize <0 || maximumPoolSize <= 0 || maximumPoolSize <corePoolSize || keepAliveTime <0) {code} em java.util.ThreadPoolExecutor (pelo menos openJdk 7)
Bogdan
6

Tive que usar o mesmo "CurrentThreadExecutorService" para fins de teste e, embora todas as soluções sugeridas fossem boas (principalmente aquela que mencionava o método Guava ), descobri algo semelhante ao que Peter Lawrey sugeriu aqui .

Conforme mencionado por Axelle Ziegler aqui , infelizmente a solução de Peter não funcionará de fato por causa da verificação introduzida no parâmetro ThreadPoolExecutordo maximumPoolSizeconstrutor (ou seja, maximumPoolSizenão pode ser <=0).

Para contornar isso, fiz o seguinte:

private static ExecutorService currentThreadExecutorService() {
    CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
    return new ThreadPoolExecutor(0, 1, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), callerRunsPolicy) {
        @Override
        public void execute(Runnable command) {
            callerRunsPolicy.rejectedExecution(command, this);
        }
    };
}
Fabriziocucci
fonte