espere até que todos os threads terminem seu trabalho em java

91

Estou escrevendo um aplicativo que possui 5 threads que obtêm algumas informações da web simultaneamente e preenchem 5 campos diferentes em uma classe de buffer.
Preciso validar os dados do buffer e armazená-los em um banco de dados quando todos os threads terminarem seu trabalho.
Como posso fazer isso (ser alertado quando todos os threads concluírem seu trabalho)?

RYN
fonte
4
Thread.join é uma maneira idiossincrática de Java de baixo nível para resolver o problema. Além disso, é problemático porque a API Thread é falha: você não pode saber se a junção foi concluída com êxito ou não (consulte Java Concurrency In Practice ). Abstração de nível superior, como usar um CountDownLatch, pode ser preferível e parecer mais natural para programadores que não estão "presos" na mentalidade idiossincrática do Java. Não discuta comigo, vá discutir com Doug Lea; )
Cedric Martin,

Respostas:

119

A abordagem que utilizo é usar um ExecutorService para gerenciar pools de threads.

ExecutorService es = Executors.newCachedThreadPool();
for(int i=0;i<5;i++)
    es.execute(new Runnable() { /*  your task */ });
es.shutdown();
boolean finished = es.awaitTermination(1, TimeUnit.MINUTES);
// all tasks have finished or the time has been reached.
Peter Lawrey
fonte
7
@Leonid é exatamente o que shutdown () faz.
Peter Lawrey
3
while(!es.awaitTermination(1, TimeUnit.MINUTES));
Aquarius Power
3
@AquariusPower Você poderia simplesmente dizer para esperar mais ou para sempre.
Peter Lawrey
1
Oh, entendi; então, adicionei uma mensagem no loop dizendo que ele está aguardando a conclusão de todos os threads; obrigado!
Aquarius Power
1
@PeterLawrey, É necessário ligar es.shutdown();? e se eu escrever um código no qual execute um thread usando es.execute(runnableObj_ZipMaking);em trybloco e em finallyeu chame boolean finshed = es.awaitTermination(10, TimeUnit.MINUTES);. Portanto, suponho que isso deva esperar até que todos os threads concluam seu trabalho ou o tempo limite ocorra (o que ocorrer primeiro). Minha suposição está correta? ou ligar para shutdown()é obrigatório?
Amogh,
52

Você pode joinpara os tópicos. A junção bloqueia até que o thread seja concluído.

for (Thread thread : threads) {
    thread.join();
}

Observe que joinlança um InterruptedException. Você terá que decidir o que fazer se isso acontecer (por exemplo, tentar cancelar os outros threads para evitar que trabalho desnecessário seja feito).

Mark Byers
fonte
1
Esses threads são executados em paralelo ou sequencialmente entre si?
James Webster,
5
@JamesWebster: Paralelo.
RYN
4
@James Webster: A declaração t.join();significa que o thread atual bloqueia até que o thread ttermine. Não afeta o segmento t.
Mark Byers
1
Obrigado. =] Estudei paralelismo na universidade, mas foi a única coisa que tive dificuldade em aprender! Felizmente, não preciso usá-lo muito agora ou, quando o faço, não é muito complexo ou não há recursos compartilhados e o bloqueio não é crítico
James Webster
1
@ 4r1y4n Se o código fornecido é realmente paralelo depende do que você está tentando fazer com ele e está mais relacionado à agregação de dados espalhados por coleções usando threads unidos. Você está ingressando em tópicos, o que potencialmente significa "ingressar" em dados. Além disso, paralelismo NÃO significa necessariamente simultaneidade. Isso depende das CPUs. Pode muito bem ser que os threads estejam executando em paralelo, mas os cálculos estão acontecendo em qualquer ordem determinada pela CPU subjacente.
22

Dê uma olhada em várias soluções.

  1. join()API foi introduzida nas primeiras versões do Java. Algumas boas alternativas estão disponíveis com este pacote simultâneo desde o lançamento do JDK 1.5.

  2. ExecutorService # invokeAll ()

    Executa as tarefas fornecidas, retornando uma lista de Futuros com seus status e resultados quando tudo estiver concluído.

    Consulte esta questão SE relacionada para obter um exemplo de código:

    Como usar invokeAll () para permitir que todos os conjuntos de threads realizem suas tarefas?

  3. CountDownLatch

    Um auxílio de sincronização que permite que um ou mais threads aguardem até que um conjunto de operações sendo realizadas em outros threads seja concluído.

    Um CountDownLatch é inicializado com uma determinada contagem. Os métodos de await são bloqueados até que a contagem atual chegue a zero devido às invocações do countDown()método, após o que todos os threads em espera são liberados e quaisquer invocações subsequentes de await retornam imediatamente. Este é um fenômeno único - a contagem não pode ser zerada. Se você precisar de uma versão que redefina a contagem, considere o uso de um CyclicBarrier .

    Consulte esta pergunta para o uso de CountDownLatch

    Como esperar por um segmento que gera seu próprio segmento?

  4. ForkJoinPool ou newWorkStealingPool () em Executores

  5. Iterar por meio de todos os objetos Future criados após o envio paraExecutorService

Ravindra babu
fonte
11

Além do Thread.join()sugerido por outros, o java 5 introduziu a estrutura do executor. Lá você não trabalha com Threadobjetos. Em vez disso, você envia seus objetos Callableou Runnablea um executor. Existe um executor especial que se destina a executar várias tarefas e retornar seus resultados fora de ordem. Esse é o ExecutorCompletionService:

ExecutorCompletionService executor;
for (..) {
    executor.submit(Executors.callable(yourRunnable));
}

Em seguida, você pode chamar repetidamente take()até que não haja mais Future<?>objetos para retornar, o que significa que todos eles estão concluídos.


Outra coisa que pode ser relevante, dependendo do seu cenário é CyclicBarrier.

Um auxílio de sincronização que permite que um conjunto de threads espere um pelo outro para atingir um ponto de barreira comum. CyclicBarriers são úteis em programas que envolvem um grupo de threads de tamanho fixo que deve ocasionalmente esperar um pelo outro. A barreira é chamada de cíclica porque pode ser reutilizada depois que os threads em espera são liberados.

Bozho
fonte
Está perto, mas ainda faria alguns ajustes. executor.submitretorna a Future<?>. Eu acrescentaria esses futuros a uma lista e, em seguida, percorreria a lista chamando getcada futuro.
Ray
Além disso, você pode instanciar um construtor usando Executors, por exemplo, Executors.newCachedThreadPool(ou semelhante)
Ray
10

Outra possibilidade é o CountDownLatchobjeto, que é útil para situações simples: já que você sabe de antemão o número de threads, você inicializa com a contagem relevante e passa a referência do objeto para cada thread.
Após a conclusão de sua tarefa, cada thread chama o CountDownLatch.countDown()que diminui o contador interno. O thread principal, após iniciar todos os outros, deve fazer a CountDownLatch.await()chamada de bloqueio. Ele será liberado assim que o contador interno atingir 0.

Preste atenção que com este objeto, um também InterruptedExceptionpode ser lançado.

interDist
fonte
8

Você faz

for (Thread t : new Thread[] { th1, th2, th3, th4, th5 })
    t.join()

Após este loop for, você pode ter certeza de que todos os threads concluíram seus trabalhos.

aioobe
fonte
6

Espere / bloqueie o Thread Principal até que outros threads concluam seu trabalho.

Como @Ravindra babudisse, pode ser alcançado de várias formas, mas mostrando com exemplos.

  • java.lang.Thread. join () Desde: 1.0

    public static void joiningThreads() throws InterruptedException {
        Thread t1 = new Thread( new LatchTask(1, null), "T1" );
        Thread t2 = new Thread( new LatchTask(7, null), "T2" );
        Thread t3 = new Thread( new LatchTask(5, null), "T3" );
        Thread t4 = new Thread( new LatchTask(2, null), "T4" );
    
        // Start all the threads
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    
        // Wait till all threads completes
        t1.join();
        t2.join();
        t3.join();
        t4.join();
    }
  • java.util.concurrent.CountDownLatch desde: 1.5

    • .countDown() «Diminui a contagem do grupo de trava.
    • .await() «Os métodos de espera bloqueiam até que a contagem atual chegue a zero.

    Se você criou latchGroupCount = 4então countDown()deve ser chamado 4 vezes para fazer contar 0. Então, isso await()irá liberar os threads de bloqueio.

    public static void latchThreads() throws InterruptedException {
        int latchGroupCount = 4;
        CountDownLatch latch = new CountDownLatch(latchGroupCount);
        Thread t1 = new Thread( new LatchTask(1, latch), "T1" );
        Thread t2 = new Thread( new LatchTask(7, latch), "T2" );
        Thread t3 = new Thread( new LatchTask(5, latch), "T3" );
        Thread t4 = new Thread( new LatchTask(2, latch), "T4" );
    
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    
        //latch.countDown();
    
        latch.await(); // block until latchGroupCount is 0.
    }

Exemplo de código da classe Threaded LatchTask. Para testar o uso da abordagem joiningThreads(); e latchThreads();do método principal.

class LatchTask extends Thread {
    CountDownLatch latch;
    int iterations = 10;
    public LatchTask(int iterations, CountDownLatch latch) {
        this.iterations = iterations;
        this.latch = latch;
    }

    @Override
    public void run() {
        String threadName = Thread.currentThread().getName();
        System.out.println(threadName + " : Started Task...");

        for (int i = 0; i < iterations; i++) {
            System.out.println(threadName + " : " + i);
            MainThread_Wait_TillWorkerThreadsComplete.sleep(1);
        }
        System.out.println(threadName + " : Completed Task");
        // countDown() « Decrements the count of the latch group.
        if(latch != null)
            latch.countDown();
    }
}
  • CyclicBarriers Um auxílio de sincronização que permite que um conjunto de threads esperem um pelo outro para atingir um ponto de barreira comum. CyclicBarriers são úteis em programas que envolvem um grupo de threads de tamanho fixo que ocasionalmente deve esperar uns pelos outros. A barreira é chamada de cíclica porque pode ser reutilizada depois que os threads em espera são liberados.
    CyclicBarrier barrier = new CyclicBarrier(3);
    barrier.await();
    Por exemplo, consulte esta classe Concurrent_ParallelNotifyies .

  • Estrutura do executor : podemos usar ExecutorService para criar um pool de threads e rastrear o progresso das tarefas assíncronas com Future.

    • submit(Runnable), submit(Callable)que retorna Objeto Futuro. Ao usar a future.get()função, podemos bloquear o thread principal até que os threads de trabalho concluam seu trabalho.

    • invokeAll(...) - retorna uma lista de objetos Future por meio dos quais você pode obter os resultados das execuções de cada Callable.

Encontre exemplos de uso de Interfaces Runnable, Callable with Executor framework.


@Veja também

Yash
fonte
4

Armazene os objetos Thread em alguma coleção (como uma List ou Set) e, em seguida, faça um loop pela coleção assim que os threads forem iniciados e chame join () nos Threads.

esaj
fonte
2

Embora não seja relevante para o problema do OP, se você estiver interessado na sincronização (mais precisamente, um rendez-vous) com exatamente um thread, você pode usar um Exchanger

No meu caso, precisei pausar o thread pai até que o thread filho fizesse algo, por exemplo, concluía sua inicialização. Um CountDownLatch também funciona bem.

18446744073709551615
fonte
1

tente isso, vai funcionar.

  Thread[] threads = new Thread[10];

  List<Thread> allThreads = new ArrayList<Thread>();

  for(Thread thread : threads){

        if(null != thread){

              if(thread.isAlive()){

                    allThreads.add(thread);

              }

        }

  }

  while(!allThreads.isEmpty()){

        Iterator<Thread> ite = allThreads.iterator();

        while(ite.hasNext()){

              Thread thread = ite.next();

              if(!thread.isAlive()){

                   ite.remove();
              }

        }

   }
Jeyaraj.J
fonte
1

Tive um problema semelhante e acabei usando Java 8 parallelStream.

requestList.parallelStream().forEach(req -> makeRequest(req));

É super simples e legível. Nos bastidores, ele está usando o pool de junção de bifurcação padrão da JVM, o que significa que ele aguardará a conclusão de todos os encadeamentos antes de continuar. Para o meu caso, foi uma solução bacana, porque era o único parallelStream em meu aplicativo. Se você tiver mais de um parallelStream em execução simultaneamente, leia o link abaixo.

Mais informações sobre fluxos paralelos aqui .

Madis Pukkonen
fonte
0

As respostas existentes disse que poderia join()cada segmento.

Mas existem várias maneiras de obter a lista / array de threads:

  • Adicione o Thread em uma lista na criação.
  • Use ThreadGrouppara gerenciar os threads.

O código a seguir usará a ThreadGruopabordagem. Ele cria um grupo primeiro, então ao criar cada thread especifica o grupo no construtor, mais tarde pode obter a matriz de thread por meio deThreadGroup.enumerate()


Código

SyncBlockLearn.java

import org.testng.Assert;
import org.testng.annotations.Test;

/**
 * synchronized block - learn,
 *
 * @author eric
 * @date Apr 20, 2015 1:37:11 PM
 */
public class SyncBlockLearn {
    private static final int TD_COUNT = 5; // thread count
    private static final int ROUND_PER_THREAD = 100; // round for each thread,
    private static final long INC_DELAY = 10; // delay of each increase,

    // sync block test,
    @Test
    public void syncBlockTest() throws InterruptedException {
        Counter ct = new Counter();
        ThreadGroup tg = new ThreadGroup("runner");

        for (int i = 0; i < TD_COUNT; i++) {
            new Thread(tg, ct, "t-" + i).start();
        }

        Thread[] tArr = new Thread[TD_COUNT];
        tg.enumerate(tArr); // get threads,

        // wait all runner to finish,
        for (Thread t : tArr) {
            t.join();
        }

        System.out.printf("\nfinal count: %d\n", ct.getCount());
        Assert.assertEquals(ct.getCount(), TD_COUNT * ROUND_PER_THREAD);
    }

    static class Counter implements Runnable {
        private final Object lkOn = new Object(); // the object to lock on,
        private int count = 0;

        @Override
        public void run() {
            System.out.printf("[%s] begin\n", Thread.currentThread().getName());

            for (int i = 0; i < ROUND_PER_THREAD; i++) {
                synchronized (lkOn) {
                    System.out.printf("[%s] [%d] inc to: %d\n", Thread.currentThread().getName(), i, ++count);
                }
                try {
                    Thread.sleep(INC_DELAY); // wait a while,
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            System.out.printf("[%s] end\n", Thread.currentThread().getName());
        }

        public int getCount() {
            return count;
        }
    }
}

O encadeamento principal aguardará que todos os encadeamentos do grupo sejam concluídos.

Eric Wang
fonte
0

Criei um pequeno método auxiliar para aguardar a conclusão de alguns Threads:

public static void waitForThreadsToFinish(Thread... threads) {
        try {
            for (Thread thread : threads) {
                thread.join();
            }
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
Alon Gouldman
fonte
-1

Use isto em seu thread principal: while (! Executor.isTerminated ()); Coloque esta linha de código após iniciar todos os threads do serviço do executor. Isso só iniciará o thread principal depois que todos os threads iniciados pelos executores forem concluídos. Certifique-se de chamar executor.shutdown (); antes do loop acima.

Magali
fonte
Esta é a espera ativa, que fará com que a CPU esteja constantemente executando um loop vazio. Muito desperdício.
Adam Michalik