Como esperar que vários tópicos sejam concluídos?

109

Qual é a maneira de simplesmente esperar que todo o processo encadeado termine? Por exemplo, digamos que eu tenho:

public class DoSomethingInAThread implements Runnable{

    public static void main(String[] args) {
        for (int n=0; n<1000; n++) {
            Thread t = new Thread(new DoSomethingInAThread());
            t.start();
        }
        // wait for all threads' run() methods to complete before continuing
    }

    public void run() {
        // do something here
    }


}

Como altero isso para que o main()método pause no comentário até que todos os run()métodos de encadeamento sejam encerrados? Obrigado!

DivideByHero
fonte

Respostas:

163

Você coloca todos os threads em uma matriz, inicia todos eles e, em seguida, tem um loop

for(i = 0; i < threads.length; i++)
  threads[i].join();

Cada junção será bloqueada até que o respectivo segmento seja concluído. Os threads podem ser concluídos em uma ordem diferente daquela em que você os juntou, mas isso não é um problema: quando o loop termina, todos os threads estão concluídos.

Martin v. Löwis
fonte
1
@Mykola: qual é exatamente a vantagem de usar um grupo de discussão? Só porque a API está lá não significa que você precisa usá-la ...
Martin v. Löwis
2
Veja: "Um grupo de threads representa um conjunto de threads." Isso é semântico correto para este caso de uso! E: "Um tópico tem permissão para acessar informações sobre seu próprio grupo de tópicos"
Martin K.
4
O livro “Effective Java” recomenda evitar grupos de threads (item 73).
Bastien Léonard
2
Os bugs mencionados em Effective Java deveriam ter sido corrigidos no Java 6. Se as versões mais recentes do Java não forem uma restrição, é melhor usar Futures para resolver problemas de thread. Martin v. Löwis: Você está certo. Não é relevante para esse problema, mas é bom obter mais informações sobre as threads em execução de um objeto (como o ExecutorService). Acho que é bom usar determinados recursos para resolver um problema; talvez você precise de mais flexibilidade (informações de thread) no futuro. Também é certo mencionar as antigas classes de bugs em JDKs mais antigos.
Martin K.
5
ThreadGroup não implementa uma junção em nível de grupo, então porque as pessoas estão empurrando ThreadGroup é um pouco confuso. As pessoas estão realmente usando spin locks e consultando o activeCount do grupo? Você teria dificuldade em me convencer de que fazer isso é melhor de alguma forma, em comparação com apenas chamar join em todos os threads.
41

Uma maneira seria criar um Listde Threads, criar e lançar cada tópico, enquanto o adicionava à lista. Depois que tudo for iniciado, volte para a lista e chame join()cada um. Não importa em que ordem os threads terminam de ser executados, tudo o que você precisa saber é que, no momento em que o segundo loop termina, cada thread terá sido concluída.

Uma abordagem melhor é usar um ExecutorService e seus métodos associados:

List<Callable> callables = ... // assemble list of Callables here
                               // Like Runnable but can return a value
ExecutorService execSvc = Executors.newCachedThreadPool();
List<Future<?>> results = execSvc.invokeAll(callables);
// Note: You may not care about the return values, in which case don't
//       bother saving them

Usar um ExecutorService (e todas as coisas novas dos utilitários de simultaneidade do Java 5 ) é incrivelmente flexível, e o exemplo acima mal arranha a superfície.

Adam Batkin
fonte
ThreadGroup é o caminho a seguir! Com uma lista mutável, você terá problemas (sincronização)
Martin K.
3
O que? Como você se meteria em problemas? É apenas mutável (apenas legível) pelo thread que está iniciando, então, desde que ele não modifique a lista durante a iteração por ela, tudo bem.
Adam Batkin
Depende de como você o usa. Se você usar a classe de chamada em um thread, terá problemas.
Martin K.
27
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class DoSomethingInAThread implements Runnable
{
   public static void main(String[] args) throws ExecutionException, InterruptedException
   {
      //limit the number of actual threads
      int poolSize = 10;
      ExecutorService service = Executors.newFixedThreadPool(poolSize);
      List<Future<Runnable>> futures = new ArrayList<Future<Runnable>>();

      for (int n = 0; n < 1000; n++)
      {
         Future f = service.submit(new DoSomethingInAThread());
         futures.add(f);
      }

      // wait for all tasks to complete before continuing
      for (Future<Runnable> f : futures)
      {
         f.get();
      }

      //shut down the executor service so that this thread can exit
      service.shutdownNow();
   }

   public void run()
   {
      // do something here
   }
}
jt.
fonte
funcionou como um encanto ... eu tenho dois conjuntos de threads que não devem ser executados simultaneamente devido a problemas em vários cookies. Eu usei seu exemplo para executar um conjunto de threads de cada vez .. obrigado por compartilhar seu conhecimento ...
arn-arn
@Dantalian - Em sua classe Runnable (provavelmente no método run), você deseja capturar todas as exceções que ocorreram e armazená-las localmente (ou armazenar uma mensagem / condição de erro). No exemplo, f.get () retorna o objeto que você enviou ao ExecutorService. Seu objeto pode ter um método para recuperar quaisquer exceções / condições de erro. Dependendo de como você modifica o exemplo fornecido, pode ser necessário converter o objeto transformado por f.get () em seu tipo esperado.
jt.
12

em vez de join(), que é uma API antiga, você pode usar CountDownLatch . Modifiquei seu código conforme abaixo para atender às suas necessidades.

import java.util.concurrent.*;
class DoSomethingInAThread implements Runnable{
    CountDownLatch latch;
    public DoSomethingInAThread(CountDownLatch latch){
        this.latch = latch;
    } 
    public void run() {
        try{
            System.out.println("Do some thing");
            latch.countDown();
        }catch(Exception err){
            err.printStackTrace();
        }
    }
}

public class CountDownLatchDemo {
    public static void main(String[] args) {
        try{
            CountDownLatch latch = new CountDownLatch(1000);
            for (int n=0; n<1000; n++) {
                Thread t = new Thread(new DoSomethingInAThread(latch));
                t.start();
            }
            latch.await();
            System.out.println("In Main thread after completion of 1000 threads");
        }catch(Exception err){
            err.printStackTrace();
        }
    }
}

Explicação :

  1. CountDownLatch foi inicializado com a contagem fornecida 1000 de acordo com sua exigência.

  2. Cada thread de trabalho DoSomethingInAThread diminuirá o CountDownLatch, que foi passado no construtor.

  3. Tópico principal CountDownLatchDemo await()até a contagem chegar a zero. Quando a contagem chegar a zero, você ficará abaixo da linha na saída.

    In Main thread after completion of 1000 threads

Mais informações na página de documentação do oracle

public void await()
           throws InterruptedException

Faz com que o encadeamento atual espere até que a trava tenha contado até zero, a menos que o encadeamento seja interrompido.

Consulte a pergunta SE relacionada para outras opções:

espere até que todos os threads terminem seu trabalho em java

Ravindra babu
fonte
8

Evite a classe Thread e, em vez disso, use as abstrações superiores fornecidas em java.util.concurrent

A classe ExecutorService fornece o método invokeAll que parece fazer exatamente o que você deseja.

Henrik
fonte
6

Considere usar java.util.concurrent.CountDownLatch. Exemplos em javadocs

Pablo Cavalieri
fonte
É uma trava para threads, a trava funciona com uma contagem regressiva. No método run () de seu thread, declare explicitamente esperar que um CountDownLatch alcance sua contagem regressiva para 0. Você pode usar o mesmo CountDownLatch em mais de um thread para liberá-los simultaneamente. Não sei se é o que você precisa, só queria mencionar porque é útil quando se trabalha em um ambiente multithread.
Pablo Cavalieri
Talvez você deva colocar essa explicação no corpo da sua resposta.
Aaron Hall
Os exemplos no Javadoc são muito descritivos, por isso não adicionei nenhum. docs.oracle.com/javase/7/docs/api/java/util/concurrent/… . No primeiro exemplo, todos os threads de trabalho são liberados simultaneamente porque esperam que o CountdownLatch startSignal chegue a zero, o que acontece em startSignal.countDown (). Então, a thread mian espera até que todos os trabalhos terminem usando a instrução doneSignal.await (). doneSignal diminui seu valor em cada trabalhador.
Pablo Cavalieri
6

Como sugeriu Martin K java.util.concurrent.CountDownLatchparece ser a melhor solução para isso. Apenas adicionando um exemplo para o mesmo

     public class CountDownLatchDemo
{

    public static void main (String[] args)
    {
        int noOfThreads = 5;
        // Declare the count down latch based on the number of threads you need
        // to wait on
        final CountDownLatch executionCompleted = new CountDownLatch(noOfThreads);
        for (int i = 0; i < noOfThreads; i++)
        {
            new Thread()
            {

                @Override
                public void run ()
                {

                    System.out.println("I am executed by :" + Thread.currentThread().getName());
                    try
                    {
                        // Dummy sleep
                        Thread.sleep(3000);
                        // One thread has completed its job
                        executionCompleted.countDown();
                    }
                    catch (InterruptedException e)
                    {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }

            }.start();
        }

        try
        {
            // Wait till the count down latch opens.In the given case till five
            // times countDown method is invoked
            executionCompleted.await();
            System.out.println("All over");
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }

}
Freaky Thommi
fonte
4

Dependendo de suas necessidades, você também pode verificar as classes CountDownLatch e CyclicBarrier no pacote java.util.concurrent. Eles podem ser úteis se você quiser que seus threads esperem uns pelos outros, ou se quiser um controle mais refinado sobre a maneira como seus threads são executados (por exemplo, aguardando em sua execução interna por outro thread para definir algum estado). Você também pode usar um CountDownLatch para sinalizar todos os seus threads para iniciar ao mesmo tempo, em vez de iniciá-los um por um conforme você itera em seu loop. Os documentos padrão da API têm um exemplo disso, além de usar outro CountDownLatch para esperar que todos os threads concluam sua execução.

Jeff
fonte
1

Crie o objeto thread dentro do primeiro loop for.

for (int i = 0; i < threads.length; i++) {
     threads[i] = new Thread(new Runnable() {
         public void run() {
             // some code to run in parallel
         }
     });
     threads[i].start();
 }

E então o que todos aqui estão dizendo.

for(i = 0; i < threads.length; i++)
  threads[i].join();
optimus0127
fonte
0

Você pode fazer isso com o objeto "ThreadGroup" e seu parâmetro activeCount :

Martin K.
fonte
Não tenho certeza de como exatamente você se propõe a fazer isso. Se você propõe pesquisar activeCount em um loop: isso é ruim, uma vez que é ocupado-esperar (mesmo se você dormir entre as pesquisas - você terá uma troca entre negócios e capacidade de resposta).
Martin v. Löwis
@Martin v. Löwis: "Join irá esperar por apenas um único thread. Uma solução melhor pode ser java.util.concurrent.CountDownLatch. Simplesmente inicialize a trava com a contagem definida para o número de threads de trabalho. Cada thread de trabalho deve chamar countDown () logo antes de sair, e o thread principal simplesmente chama await (), que será bloqueado até que o contador chegue a zero. O problema com join () também é que você não pode começar a adicionar mais threads dinamicamente. A lista vai explodir com uma modificação simultânea. " Sua solução funciona bem para o problema, mas não para o propósito geral.
Martin K.
0

Como alternativa ao CountDownLatch, você também pode usar CyclicBarrier, por exemplo

public class ThreadWaitEx {
    static CyclicBarrier barrier = new CyclicBarrier(100, new Runnable(){
        public void run(){
            System.out.println("clean up job after all tasks are done.");
        }
    });
    public static void main(String[] args) {
        for (int i = 0; i < 100; i++) {
            Thread t = new Thread(new MyCallable(barrier));
            t.start();
        }       
    }

}    

class MyCallable implements Runnable{
    private CyclicBarrier b = null;
    public MyCallable(CyclicBarrier b){
        this.b = b;
    }
    @Override
    public void run(){
        try {
            //do something
            System.out.println(Thread.currentThread().getName()+" is waiting for barrier after completing his job.");
            b.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }       
}

Para usar CyclicBarrier, neste caso, o tipo de barreira.await () deve ser a última instrução, ou seja, quando sua thread terminar seu trabalho. CyclicBarrier pode ser usado novamente com seu método reset (). Para citar javadocs:

Um CyclicBarrier oferece suporte a um comando Runnable opcional que é executado uma vez por ponto de barreira, após a chegada do último encadeamento no grupo, mas antes de qualquer encadeamento ser liberado. Esta ação de barreira é útil para atualizar o estado compartilhado antes que qualquer uma das partes continue.

shailendra1118
fonte
Não acho que seja um bom exemplo para CyclicBarrier. Por que você usa uma chamada Thread.sleep ()?
Guenther
@Guenther - sim, mudei o código para atender ao requisito.
shailendra1118,
CyclicBarrier não é uma alternativa para CountDownLatch. Quando os encadeamentos devem fazer uma contagem regressiva repetidamente, você deve criar um CyclicBarrier, caso contrário, o padrão é CountDownLatch (a menos que exija abstração adicional de Execução, ponto em que você deve olhar para os Serviços de nível superior).
Elysiumplain
0

Não join()foi útil para mim. veja este exemplo em Kotlin:

    val timeInMillis = System.currentTimeMillis()
    ThreadUtils.startNewThread(Runnable {
        for (i in 1..5) {
            val t = Thread(Runnable {
                Thread.sleep(50)
                var a = i
                kotlin.io.println(Thread.currentThread().name + "|" + "a=$a")
                Thread.sleep(200)
                for (j in 1..5) {
                    a *= j
                    Thread.sleep(100)
                    kotlin.io.println(Thread.currentThread().name + "|" + "$a*$j=$a")
                }
                kotlin.io.println(Thread.currentThread().name + "|TaskDurationInMillis = " + (System.currentTimeMillis() - timeInMillis))
            })
            t.start()
        }
    })

O resultado:

Thread-5|a=5
Thread-1|a=1
Thread-3|a=3
Thread-2|a=2
Thread-4|a=4
Thread-2|2*1=2
Thread-3|3*1=3
Thread-1|1*1=1
Thread-5|5*1=5
Thread-4|4*1=4
Thread-1|2*2=2
Thread-5|10*2=10
Thread-3|6*2=6
Thread-4|8*2=8
Thread-2|4*2=4
Thread-3|18*3=18
Thread-1|6*3=6
Thread-5|30*3=30
Thread-2|12*3=12
Thread-4|24*3=24
Thread-4|96*4=96
Thread-2|48*4=48
Thread-5|120*4=120
Thread-1|24*4=24
Thread-3|72*4=72
Thread-5|600*5=600
Thread-4|480*5=480
Thread-3|360*5=360
Thread-1|120*5=120
Thread-2|240*5=240
Thread-1|TaskDurationInMillis = 765
Thread-3|TaskDurationInMillis = 765
Thread-4|TaskDurationInMillis = 765
Thread-5|TaskDurationInMillis = 765
Thread-2|TaskDurationInMillis = 765

Agora, deixe-me usar o join()para tópicos:

    val timeInMillis = System.currentTimeMillis()
    ThreadUtils.startNewThread(Runnable {
        for (i in 1..5) {
            val t = Thread(Runnable {
                Thread.sleep(50)
                var a = i
                kotlin.io.println(Thread.currentThread().name + "|" + "a=$a")
                Thread.sleep(200)
                for (j in 1..5) {
                    a *= j
                    Thread.sleep(100)
                    kotlin.io.println(Thread.currentThread().name + "|" + "$a*$j=$a")
                }
                kotlin.io.println(Thread.currentThread().name + "|TaskDurationInMillis = " + (System.currentTimeMillis() - timeInMillis))
            })
            t.start()
            t.join()
        }
    })

E o resultado:

Thread-1|a=1
Thread-1|1*1=1
Thread-1|2*2=2
Thread-1|6*3=6
Thread-1|24*4=24
Thread-1|120*5=120
Thread-1|TaskDurationInMillis = 815
Thread-2|a=2
Thread-2|2*1=2
Thread-2|4*2=4
Thread-2|12*3=12
Thread-2|48*4=48
Thread-2|240*5=240
Thread-2|TaskDurationInMillis = 1568
Thread-3|a=3
Thread-3|3*1=3
Thread-3|6*2=6
Thread-3|18*3=18
Thread-3|72*4=72
Thread-3|360*5=360
Thread-3|TaskDurationInMillis = 2323
Thread-4|a=4
Thread-4|4*1=4
Thread-4|8*2=8
Thread-4|24*3=24
Thread-4|96*4=96
Thread-4|480*5=480
Thread-4|TaskDurationInMillis = 3078
Thread-5|a=5
Thread-5|5*1=5
Thread-5|10*2=10
Thread-5|30*3=30
Thread-5|120*4=120
Thread-5|600*5=600
Thread-5|TaskDurationInMillis = 3833

Como fica claro quando usamos join:

  1. Os threads estão sendo executados sequencialmente.
  2. A primeira amostra leva 765 milissegundos, enquanto a segunda amostra leva 3833 milissegundos.

Nossa solução para evitar o bloqueio de outros threads foi criar um ArrayList:

val threads = ArrayList<Thread>()

Agora, quando queremos iniciar um novo thread, devemos adicioná-lo ao ArrayList:

addThreadToArray(
    ThreadUtils.startNewThread(Runnable {
        ...
    })
)

A addThreadToArrayfunção:

@Synchronized
fun addThreadToArray(th: Thread) {
    threads.add(th)
}

A startNewThreadfunção:

fun startNewThread(runnable: Runnable) : Thread {
    val th = Thread(runnable)
    th.isDaemon = false
    th.priority = Thread.MAX_PRIORITY
    th.start()
    return th
}

Verifique a conclusão dos threads conforme abaixo em todos os lugares necessários:

val notAliveThreads = ArrayList<Thread>()
for (t in threads)
    if (!t.isAlive)
        notAliveThreads.add(t)
threads.removeAll(notAliveThreads)
if (threads.size == 0){
    // The size is 0 -> there is no alive threads.
}
Misagh
fonte