Multiprocessamento: use tqdm para exibir uma barra de progresso

97

Para tornar meu código mais "pythônico" e mais rápido, eu uso "multiprocessamento" e uma função de mapa para enviar a) a função eb) o intervalo de iterações.

A solução implantada (ou seja, chame tqdm diretamente no intervalo tqdm.tqdm (intervalo (0, 30)) não funciona com multiprocessamento (conforme formulado no código abaixo).

A barra de progresso é exibida de 0 a 100% (quando o python lê o código?), Mas não indica o progresso real da função do mapa.

Como exibir uma barra de progresso que indica em qual etapa está a função 'mapa'?

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   p = Pool(2)
   r = p.map(_foo, tqdm.tqdm(range(0, 30)))
   p.close()
   p.join()

Qualquer ajuda ou sugestão é bem vinda ...

SciPy
fonte
Você pode postar o trecho de código da barra de progresso?
Alex
1
Para pessoas em busca de uma solução com .starmap(): Aqui está um patch para Pooladicionar .istarmap(), que também funcionará tqdm.
Darkonaut

Respostas:

127

Use imap em vez de map, que retorna um iterador de valores processados.

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   with Pool(2) as p:
      r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))
Hkyi
fonte
13
Uma instrução list () envolvente aguarda o término do iterador. total = também é necessário, pois tqdm não sabe quanto tempo a iteração durará,
hkyi
13
Existe uma solução semelhante para starmap()?
tarashypka de
1
for i in tqdm.tqdm(...): pass pode ser mais direto, quelist(tqdm.tqdm)
savfod 02 de
1
Isso funciona, mas mais alguém imprimiu continuamente a barra de progresso em uma nova linha para cada iteração?
Dennis Subachev
3
O comportamento é conectado quando específico chunk_sizede p.imap. Pode tqdmatualizar cada iteração em vez de cada pedaço?
huangbiubiu
49

Solução encontrada: tenha cuidado! Devido ao multiprocessamento, o tempo de estimativa (iteração por loop, tempo total, etc.) pode ser instável, mas a barra de progresso funciona perfeitamente.

Observação: o gerenciador de contexto para Pool está disponível apenas a partir do Python versão 3.3

from multiprocessing import Pool
import time
from tqdm import *

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
    with Pool(processes=2) as p:
        max_ = 30
        with tqdm(total=max_) as pbar:
            for i, _ in enumerate(p.imap_unordered(_foo, range(0, max_))):
                pbar.update()
SciPy
fonte
2
pbar.close()não é necessário, será fechado automaticamente no encerramento dewith
Sagar Kar
5
A segunda tqdmchamada / chamada interna é necessária aqui?
shadowtalker
5
e a saída de _foo (my_number) que é retornado como "r" em questão?
Likak de
3
Existe uma solução semelhante para starmap()?
tarashypka de
2
@shadowtalker - parece funcionar sem;). Enfim - imap_unorderedé a chave aqui, dá melhor desempenho e melhores estimativas de barra de progresso.
Tomasz Gandor
19

Você pode usar em seu p_tqdmlugar.

https://github.com/swansonk14/p_tqdm

from p_tqdm import p_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = p_map(_foo, list(range(0, 30)))
Victor Quach
fonte
1
Isso funciona muito bem e é muito fácil de fazer pip install. Isso está substituindo tqdm para a maioria das minhas necessidades
crypdick
Merci Victor;)
Gabriel Romon
p_tqdmestá limitado a multiprocessing.Pool, não disponível para tópicos
pateheo
17

Desculpe pelo atraso, mas se você só precisa de um mapa simultâneo, a versão mais recente ( tqdm>=4.42.0) agora tem este integrado:

from tqdm.contrib.concurrent import process_map  # or thread_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = process_map(_foo, range(0, 30), max_workers=2)

Referências: https://tqdm.github.io/docs/contrib.concurrent/ e https://github.com/tqdm/tqdm/blob/master/examples/parallel_bars.py

casper.dcl
fonte
Obrigado por isso. Funciona facilmente, muito melhor do que qualquer outra solução que tentei.
user3340499
Legal (+1), mas joga HBox(children=(FloatProgress(value=0.0, max=30.0), HTML(value='')))Jupyter
Ébe Isaac
@ Ébe-Isaac, consulte github.com/tqdm/tqdm/issues/937
casper.dcl
Vejo um problema com a discussão para hackear tqdm_notebook, no entanto, não consigo encontrar uma solução para resolver para tqdm.contrib.concurrent.
Ébe Isaac
8

com base na resposta de Xavi Martínez escrevi a função imap_unordered_bar. Pode ser usado da mesma forma, imap_unorderedcom a única diferença de que uma barra de processamento é exibida.

from multiprocessing import Pool
import time
from tqdm import *

def imap_unordered_bar(func, args, n_processes = 2):
    p = Pool(n_processes)
    res_list = []
    with tqdm(total = len(args)) as pbar:
        for i, res in tqdm(enumerate(p.imap_unordered(func, args))):
            pbar.update()
            res_list.append(res)
    pbar.close()
    p.close()
    p.join()
    return res_list

def _foo(my_number):
    square = my_number * my_number
    time.sleep(1)
    return square 

if __name__ == '__main__':
    result = imap_unordered_bar(_foo, range(5))
Oliver Wilken
fonte
3
Isso redesenhará a barra a cada etapa em uma nova linha. Como atualizar a mesma linha?
misantroop
Solução no meu caso (Windows / Powershell): Colorama.
misantroop
'pbar.close () não é necessário, será fechado automaticamente no encerramento de com' como o comentário que Sagar fez na resposta de @scipy
Tejas Shetty
0
import multiprocessing as mp
import tqdm


some_iterable = ...

def some_func():
    # your logic
    ...


if __name__ == '__main__':
    with mp.Pool(mp.cpu_count()-2) as p:
        list(tqdm.tqdm(p.imap(some_func, iterable), total=len(iterable)))
dkrynicki
fonte
0

Aqui está minha opinião sobre quando você precisa obter resultados de suas funções de execução paralela. Essa função faz algumas coisas (há outro post meu que explica isso melhor), mas o ponto principal é que há uma fila de tarefas pendentes e uma fila de tarefas concluídas. Conforme os trabalhadores concluem cada tarefa na fila pendente, eles adicionam os resultados na fila de tarefas concluídas. Você pode agrupar a verificação na fila de tarefas concluídas com a barra de progresso tqdm. Não estou colocando a implementação da função do_work () aqui, não é relevante, já que a mensagem aqui é monitorar a fila de tarefas concluídas e atualizar a barra de progresso sempre que um resultado for obtido.

def par_proc(job_list, num_cpus=None, verbose=False):

# Get the number of cores
if not num_cpus:
    num_cpus = psutil.cpu_count(logical=False)

print('* Parallel processing')
print('* Running on {} cores'.format(num_cpus))

# Set-up the queues for sending and receiving data to/from the workers
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()

# Gather processes and results here
processes = []
results = []

# Count tasks
num_tasks = 0

# Add the tasks to the queue
for job in job_list:
    for task in job['tasks']:
        expanded_job = {}
        num_tasks = num_tasks + 1
        expanded_job.update({'func': pickle.dumps(job['func'])})
        expanded_job.update({'task': task})
        tasks_pending.put(expanded_job)

# Set the number of workers here
num_workers = min(num_cpus, num_tasks)

# We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
# work left to be done.
for c in range(num_workers):
    tasks_pending.put(SENTINEL)

print('* Number of tasks: {}'.format(num_tasks))

# Set-up and start the workers
for c in range(num_workers):
    p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed, verbose))
    p.name = 'worker' + str(c)
    processes.append(p)
    p.start()

# Gather the results
completed_tasks_counter = 0

with tqdm(total=num_tasks) as bar:
    while completed_tasks_counter < num_tasks:
        results.append(tasks_completed.get())
        completed_tasks_counter = completed_tasks_counter + 1
        bar.update(completed_tasks_counter)

for p in processes:
    p.join()

return results
Nick B.
fonte
-2

Essa abordagem é simples e funciona.

from multiprocessing.pool import ThreadPool
import time
from tqdm import tqdm

def job():
    time.sleep(1)
    pbar.update()

pool = ThreadPool(5)
with tqdm(total=100) as pbar:
    for i in range(100):
        pool.apply_async(job)
    pool.close()
    pool.join()
Vijayabhaskar J
fonte