multithreading python esperar até que todos os threads terminem

119

Isso pode ter sido perguntado em um contexto semelhante, mas não consegui encontrar uma resposta após cerca de 20 minutos de pesquisa, então vou perguntar.

Eu escrevi um script Python (digamos: scriptA.py) e um script (digamos scriptB.py)

No scriptB, quero chamar o scriptA várias vezes com argumentos diferentes, cada vez que leva cerca de uma hora para ser executado (é um script enorme, faz muitas coisas ... não se preocupe com isso) e quero ser capaz de executar o scriptA com todos os diferentes argumentos simultaneamente, mas preciso esperar até que TODOS eles sejam concluídos antes de continuar; meu código:

import subprocess

#setup
do_setup()

#run scriptA
subprocess.call(scriptA + argumentsA)
subprocess.call(scriptA + argumentsB)
subprocess.call(scriptA + argumentsC)

#finish
do_finish()

Eu quero executar tudo subprocess.call()ao mesmo tempo e, em seguida, esperar até que tudo esteja pronto, como devo fazer isso?

Tentei usar threading como o exemplo aqui :

from threading import Thread
import subprocess

def call_script(args)
    subprocess.call(args)

#run scriptA   
t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))
t1.start()
t2.start()
t3.start()

Mas eu não acho que isso esteja certo.

Como posso saber se todos eles terminaram de correr antes de ir para o meu do_finish()?

Inbar Rose
fonte

Respostas:

150

Você precisa usar o método de junção do Threadobjeto no final do script.

t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

Assim, o segmento principal vai esperar até t1, t2e t3terminar a execução.

Maksim Skurydzin
fonte
5
hmmm - tendo problemas para entender alguma coisa, isso não vai rodar primeiro t1, esperar até o fim e depois ir para t2..etc, etc? como fazer tudo acontecer de uma vez? não vejo como isso os executaria ao mesmo tempo?
Inbar Rose de
25
A chamada para joinblocos até que o thread termine a execução. Você terá que esperar por todos os threads de qualquer maneira. Se t1terminar primeiro, você começará a esperar t2(o que pode já ter sido concluído e você irá imediatamente prosseguir com a espera t3). Se t1demorou mais para ser executado, quando você retornar dele, ambos t1e t2retornará imediatamente sem bloqueio.
Maksim Skurydzin
1
você não entende minha pergunta - se eu copiar o código acima para o meu código - vai funcionar? Ou eu estou esquecendo de alguma coisa?
Inbar Rose de
2
Ok, eu entendo. agora eu entendo, estava um pouco confuso sobre isso, mas acho que entendi, joinmeio que anexa o processo atual ao thread e espera até que seja concluído, e se t2 terminar antes de t1 então quando t1 for feito ele irá verificar se t2 está sendo feito, veja que é e, em seguida, verifique t3..etc..etc .. e somente quando tudo estiver concluído, ele continuará. impressionante.
Inbar Rose de
3
digamos que t1 leva mais tempo, mas t2 tem uma exceção. O que acontece depois? você pode pegar essa exceção ou verificar se t2 terminou ok ou não?
Ciprian Tomoiagă
173

Coloque os tópicos em uma lista e use o método Join

 threads = []

 t = Thread(...)
 threads.append(t)

 ...repeat as often as necessary...

 # Start all threads
 for x in threads:
     x.start()

 # Wait for all of them to finish
 for x in threads:
     x.join()
Aaron Digulla
fonte
1
Sim, isso funcionaria, mas é mais difícil de entender. Você deve sempre tentar encontrar um equilíbrio entre código compacto e "legibilidade". Lembre-se: o código é escrito uma vez, mas lido muitas vezes. Portanto, é mais importante que seja fácil de entender.
Aaron Digulla de
2
O "padrão de fábrica" ​​não é algo que posso explicar em uma frase. Procure no Google por stackoverflow.com. Existem muitos exemplos e explicações. Resumindo: você escreve um código que constrói algo complexo para você. Como uma fábrica real: você dá um pedido e recebe de volta o produto acabado.
Aaron Digulla de
18
Não gosto da ideia de usar a compreensão de lista para seus efeitos colaterais e não fazer nada de útil com a lista de resultados. Um loop for simples seria mais limpo mesmo se espalhasse duas linhas ...
Ioan Alexandru Cucu
1
@Aaron DIgull Eu entendo isso. O que quero dizer é que eu faria apenas uma compreensão for x in threads: x.join()de lista em vez de usar
Ioan Alexandru Cucu
1
@IoanAlexandruCucu: Ainda estou me perguntando se existe uma solução mais legível e eficiente: stackoverflow.com/questions/21428602/…
Aaron Digulla
29

Em Python3, desde Python 3.2 há uma nova abordagem para chegar ao mesmo resultado, que eu pessoalmente prefiro ao pacote tradicional de criação / início / junção de thread concurrent.futures: https://docs.python.org/3/library/concurrent.futures .html

Usando um, ThreadPoolExecutoro código seria:

from concurrent.futures.thread import ThreadPoolExecutor
import time

def call_script(ordinal, arg):
    print('Thread', ordinal, 'argument:', arg)
    time.sleep(2)
    print('Thread', ordinal, 'Finished')

args = ['argumentsA', 'argumentsB', 'argumentsC']

with ThreadPoolExecutor(max_workers=2) as executor:
    ordinal = 1
    for arg in args:
        executor.submit(call_script, ordinal, arg)
        ordinal += 1
print('All tasks has been finished')

A saída do código anterior é algo como:

Thread 1 argument: argumentsA
Thread 2 argument: argumentsB
Thread 1 Finished
Thread 2 Finished
Thread 3 argument: argumentsC
Thread 3 Finished
All tasks has been finished

Uma das vantagens é que você pode controlar a taxa de transferência configurando o máximo de workers simultâneos.

Roberto
fonte
mas como você pode saber quando todos os threads no threadpool terminaram?
Prime By Design
1
Como você pode ver no exemplo, o código após a withinstrução é executado quando todas as tarefas são concluídas.
Roberto,
isso não funciona. Tente fazer algo muito longo em tópicos. Sua declaração de impressão será executada antes do
término do
@Pranalee, Esse código funciona, atualizei o código para adicionar as linhas de saída. Você não pode ver "Todas as tarefas ..." antes de todos os threads serem concluídos. É assim que a withinstrução funciona por design neste caso. De qualquer forma, você sempre pode abrir uma nova pergunta no SO e postar seu código para que possamos ajudá-lo a descobrir o que está acontecendo no seu caso.
Roberto
@PrimeByDesign você pode usar a concurrent.futures.waitfunção, você pode ver um exemplo real aqui Documentos oficiais: docs.python.org/3/library/…
Alexander Fortin
28

Eu prefiro usar a compreensão de lista com base em uma lista de entrada:

inputs = [scriptA + argumentsA, scriptA + argumentsB, ...]
threads = [Thread(target=call_script, args=(i)) for i in inputs]
[t.start() for t in threads]
[t.join() for t in threads]
Adam Matan
fonte
A resposta marcada explica bem, mas esta é mais curta e não requer repetições feias. Apenas uma boa resposta. :)
tleb
A compreensão da lista apenas para efeitos colaterais é geralmente depreciada *. Mas, neste caso de uso, parece uma boa ideia. * stackoverflow.com/questions/5753597/…
Vinayak Kaniyarakkal
3
@VinayakKaniyarakkal for t in threads:t.start()não é melhor?
SmartManoj
5

Você pode ter uma aula como abaixo, da qual você pode adicionar 'n' número de funções ou console_scripts que deseja executar paralelamente e iniciar a execução e esperar que todos os trabalhos sejam concluídos.

from multiprocessing import Process

class ProcessParallel(object):
    """
    To Process the  functions parallely

    """    
    def __init__(self, *jobs):
        """
        """
        self.jobs = jobs
        self.processes = []

    def fork_processes(self):
        """
        Creates the process objects for given function deligates
        """
        for job in self.jobs:
            proc  = Process(target=job)
            self.processes.append(proc)

    def start_all(self):
        """
        Starts the functions process all together.
        """
        for proc in self.processes:
            proc.start()

    def join_all(self):
        """
        Waits untill all the functions executed.
        """
        for proc in self.processes:
            proc.join()


def two_sum(a=2, b=2):
    return a + b

def multiply(a=2, b=2):
    return a * b


#How to run:
if __name__ == '__main__':
    #note: two_sum, multiply can be replace with any python console scripts which
    #you wanted to run parallel..
    procs =  ProcessParallel(two_sum, multiply)
    #Add all the process in list
    procs.fork_processes()
    #starts  process execution 
    procs.start_all()
    #wait until all the process got executed
    procs.join_all()
PBD
fonte
Isso é multiprocessamento. A pergunta era sobre docs.python.org/3/library/threading.html
Rustam A.
3

Da threading documentação do módulo

Existe um objeto “thread principal”; isso corresponde ao thread inicial de controle no programa Python. Não é um thread daemon.

Existe a possibilidade de que “objetos de thread fictícios” sejam criados. Estes são objetos de thread correspondentes a “threads estranhos”, que são threads de controle iniciados fora do módulo de threading, como diretamente do código C. Objetos de thread fictícios têm funcionalidade limitada; eles são sempre considerados vivos e demoníacos e não podem ser join()editados. Eles nunca são excluídos, uma vez que é impossível detectar o término de threads estranhos.

Então, para pegar aqueles dois casos em que você não está interessado em manter uma lista dos threads que você cria:

import threading as thrd


def alter_data(data, index):
    data[index] *= 2


data = [0, 2, 6, 20]

for i, value in enumerate(data):
    thrd.Thread(target=alter_data, args=[data, i]).start()

for thread in thrd.enumerate():
    if thread.daemon:
        continue
    try:
        thread.join()
    except RuntimeError as err:
        if 'cannot join current thread' in err.args[0]:
            # catchs main thread
            continue
        else:
            raise

Portanto:

>>> print(data)
[0, 4, 12, 40]
berna1111
fonte
2

Talvez, algo como

for t in threading.enumerate():
    if t.daemon:
        t.join()
jno
fonte
Eu tentei este código, mas não tenho certeza sobre seu funcionamento porque a última instrução do meu código foi impressa após esse loop for e ainda assim o processo não foi encerrado.
Omkar
1

Acabei de me deparar com o mesmo problema em que precisava esperar por todos os threads que foram criados usando o loop for. Acabei de experimentar o seguinte trecho de código. Pode não ser a solução perfeita, mas pensei que seria uma solução simples testar:

for t in threading.enumerate():
    try:
        t.join()
    except RuntimeError as err:
        if 'cannot join current thread' in err:
            continue
        else:
            raise
Omkar
fonte