Indicador de progresso durante operações de pandas

158

Realizo regularmente operações de pandas em quadros de dados com mais de 15 milhões de linhas e gostaria de ter acesso a um indicador de progresso para operações específicas.

Existe um indicador de progresso baseado em texto para operações de divisão de aplicação e combinação de pandas?

Por exemplo, em algo como:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

onde feature_rollupé uma função um pouco envolvida que pega muitas colunas do DF e cria novas colunas de usuário por meio de vários métodos. Essas operações podem demorar um pouco para grandes quadros de dados, então eu gostaria de saber se é possível ter uma saída baseada em texto em um notebook iPython que me atualize sobre o progresso.

Até agora, tentei indicadores de progresso de loop canônico para Python, mas eles não interagem com os pandas de maneira significativa.

Espero que haja algo que eu tenha esquecido na biblioteca / documentação do pandas que permita conhecer o progresso de uma combinação de aplicar e dividir. Uma implementação simples talvez analise o número total de subconjuntos de quadros de dados nos quais a applyfunção está funcionando e relate o progresso como a fração concluída desses subconjuntos.

Talvez isso seja algo que precise ser adicionado à biblioteca?

cwharland
fonte
você fez uma% poda (perfil) no código? às vezes você pode fazer operações em toda a estrutura antes de aplicar para eliminar gargalos
Jeff
@ Jeff: você pode apostar, eu fiz isso antes para extrair cada pedaço de desempenho dele. A questão realmente se resume ao limite de pseudo-redução de mapas em que estou trabalhando, já que as fileiras estão na casa das dezenas de milhões, por isso não espero que a super velocidade aumente, só quero um feedback sobre o progresso.
Cwharland # 4/13
Considere a codificação: pandas.pydata.org/pandas-docs/dev/…
Andy Hayden
@ AndyHayden - Como eu comentei sua resposta, sua implementação é bastante boa e adiciona uma pequena quantidade de tempo ao trabalho geral. Eu também citei três operações dentro do pacote cumulativo de recursos, que recuperou o tempo todo, agora dedicado ao progresso dos relatórios. Então, no final, aposto que terei barras de progresso com uma redução no tempo total de processamento se eu seguir com o cython em toda a função.
cwharland

Respostas:

277

Devido à demanda popular, tqdmadicionou suporte para pandas. Diferentemente das outras respostas, isso não deixará os pandas mais lentos para baixo - eis um exemplo para DataFrameGroupBy.progress_apply:

import pandas as pd
import numpy as np
from tqdm import tqdm
# from tqdm.auto import tqdm  # for notebooks

df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))

# Create and register a new `tqdm` instance with `pandas`
# (can use tqdm_gui, optional kwargs, etc.)
tqdm.pandas()

# Now you can use `progress_apply` instead of `apply`
df.groupby(0).progress_apply(lambda x: x**2)

Caso você esteja interessado em saber como isso funciona (e como modificá-lo para seus próprios retornos de chamada), veja os exemplos no github , a documentação completa sobre o pypi ou importe o módulo e execute-o help(tqdm).

EDITAR


Para responder diretamente à pergunta original, substitua:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

com:

from tqdm import tqdm
tqdm.pandas()
df_users.groupby(['userID', 'requestDate']).progress_apply(feature_rollup)

Nota: tqdm <= v4.8 : Para versões do tqdm abaixo de 4.8, em vez de tqdm.pandas()você precisar:

from tqdm import tqdm, tqdm_pandas
tqdm_pandas(tqdm())
casper.dcl
fonte
5
tqdmfoi realmente criado para iterables apenas simples originalmente: from tqdm import tqdm; for i in tqdm( range(int(1e8)) ): passO apoio pandas foi um hack recente eu fiz :)
casper.dcl
6
Aliás, se você usa os notebooks Jupyter, também pode usar tqdm_notebooks para obter uma barra mais bonita. Juntamente com pandas você atualmente precisa instanciar-lo como from tqdm import tqdm_notebook; tqdm_notebook().pandas(*args, **kwargs) vemos aqui
grinsbaeckchen
2
A partir da versão 4.8.1 - use tqdm.pandas (). github.com/tqdm/tqdm/commit/…
mork
1
Obrigado, @mork está correto. Estamos trabalhando (lentamente) para a tqdmv5, que torna as coisas mais modularizadas.
Casper.dcl #
1
Para recomendações recentes de sintaxe, consulte a documentação do tqdm Pandas aqui: pypi.python.org/pypi/tqdm#pandas-integration
Manu CJ
18

Ajustar a resposta de Jeff (e ter isso como uma função reutilizável).

def logged_apply(g, func, *args, **kwargs):
    step_percentage = 100. / len(g)
    import sys
    sys.stdout.write('apply progress:   0%')
    sys.stdout.flush()

    def logging_decorator(func):
        def wrapper(*args, **kwargs):
            progress = wrapper.count * step_percentage
            sys.stdout.write('\033[D \033[D' * 4 + format(progress, '3.0f') + '%')
            sys.stdout.flush()
            wrapper.count += 1
            return func(*args, **kwargs)
        wrapper.count = 0
        return wrapper

    logged_func = logging_decorator(func)
    res = g.apply(logged_func, *args, **kwargs)
    sys.stdout.write('\033[D \033[D' * 4 + format(100., '3.0f') + '%' + '\n')
    sys.stdout.flush()
    return res

Nota: aplique as atualizações de porcentagem de progresso em linha . Se sua função persistir, isso não funcionará.

In [11]: g = df_users.groupby(['userID', 'requestDate'])

In [12]: f = feature_rollup

In [13]: logged_apply(g, f)
apply progress: 100%
Out[13]: 
...

Como de costume, você pode adicionar isso ao seu grupo por objetos como um método:

from pandas.core.groupby import DataFrameGroupBy
DataFrameGroupBy.logged_apply = logged_apply

In [21]: g.logged_apply(f)
apply progress: 100%
Out[21]: 
...

Como mencionado nos comentários, esse não é um recurso que os principais pandas estariam interessados ​​em implementar. Mas o python permite que você os crie para muitos objetos / métodos do pandas (isso seria um pouco de trabalho ... embora você deva ser capaz de generalizar essa abordagem).

Andy Hayden
fonte
Digo "bastante trabalho", mas você provavelmente poderia reescrever toda essa função como decorador (mais geral).
Andy Hayden
Obrigado por expandir na publicação de Jeff. Eu implementei ambos e a desaceleração para cada um é bastante mínima (adicionei um total de 1,1 min a uma operação que levou 27 min para ser concluída). Dessa forma, posso ver o progresso e, dada a natureza adhoc dessas operações, acho que é uma desaceleração aceitável.
Cwharland
Excelente, feliz por ter ajudado. Fiquei realmente surpreso com a desaceleração (quando tentei um exemplo), esperava que fosse muito pior.
Andy Hayden
1
Para aumentar ainda mais a eficiência dos métodos publicados, eu estava com preguiça quanto à importação de dados (o pandas é muito bom em lidar com csv bagunçado !!) e algumas das minhas entradas (~ 1%) haviam destruído completamente as inserções (pense em todo registros inseridos em campos únicos). A eliminação dessas causas acelera enormemente o pacote cumulativo de recursos, pois não havia ambiguidade sobre o que fazer durante as operações de combinação com aplicação dividida.
Cwharland
1
Estou com menos de 8 minutos ... mas adicionei algumas coisas ao pacote cumulativo de recursos (mais recursos -> melhor AUC!). Esses 8 minutos são por bloco (total de dois blocos agora) com cada bloco na faixa de 12 milhões de linhas. Então, sim ... 16 minutos para realizar operações pesadas em 24 milhões de linhas usando o HDFStore (e há coisas nltk no pacote cumulativo de recursos). Muito bom. Vamos esperar que a internet não me julgar sobre a ignorância inicial ou ambivalência em relação as inserções desarrumada =)
cwharland
11

Caso você precise de suporte para como usar isso em um notebook Jupyter / ipython, como eu fiz, aqui está um guia e uma fonte úteis para o artigo relevante :

from tqdm._tqdm_notebook import tqdm_notebook
import pandas as pd
tqdm_notebook.pandas()
df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
df.groupby(0).progress_apply(lambda x: x**2)

Observe o sublinhado na instrução de importação para _tqdm_notebook. Como o artigo referenciado menciona, o desenvolvimento está no estágio beta tardio.

Victor Vulovic
fonte
8

Para quem deseja aplicar o tqdm em seu código de aplicação de pandas paralelos personalizado.

(Tentei algumas das bibliotecas para paralelização ao longo dos anos, mas nunca encontrei uma solução 100% de paralelização, principalmente para a função de aplicação, e sempre tive que voltar para o meu código "manual".)

df_multi_core - este é o que você chama. Aceita:

  1. Seu objeto df
  2. O nome da função que você deseja chamar
  3. O subconjunto de colunas em que a função pode ser executada (ajuda a reduzir o tempo / memória)
  4. O número de tarefas a serem executadas em paralelo (-1 ou omitir para todos os núcleos)
  5. Quaisquer outros kwargs aceitos pela função do df (como "eixo")

_df_split - esta é uma função auxiliar interna que deve ser posicionada globalmente para o módulo em execução (Pool.map é "dependente de posicionamento"), caso contrário, eu a localizaria internamente.

aqui está o código da minha essência (vou adicionar mais testes de função de pandas lá):

import pandas as pd
import numpy as np
import multiprocessing
from functools import partial

def _df_split(tup_arg, **kwargs):
    split_ind, df_split, df_f_name = tup_arg
    return (split_ind, getattr(df_split, df_f_name)(**kwargs))

def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
    if njobs == -1:
        njobs = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=njobs)

    try:
        splits = np.array_split(df[subset], njobs)
    except ValueError:
        splits = np.array_split(df, njobs)

    pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
    results = pool.map(partial(_df_split, **kwargs), pool_data)
    pool.close()
    pool.join()
    results = sorted(results, key=lambda x:x[0])
    results = pd.concat([split[1] for split in results])
    return results

Abaixo está um código de teste para uma aplicação paralela com tqdm "progress_apply".

from time import time
from tqdm import tqdm
tqdm.pandas()

if __name__ == '__main__': 
    sep = '-' * 50

    # tqdm progress_apply test      
    def apply_f(row):
        return row['c1'] + 0.1
    N = 1000000
    np.random.seed(0)
    df = pd.DataFrame({'c1': np.arange(N), 'c2': np.arange(N)})

    print('testing pandas apply on {}\n{}'.format(df.shape, sep))
    t1 = time()
    res = df.progress_apply(apply_f, axis=1)
    t2 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))

    t3 = time()
    # res = df_multi_core(df=df, df_f_name='apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    res = df_multi_core(df=df, df_f_name='progress_apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    t4 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))

Na saída, você pode ver 1 barra de progresso para execução sem paralelização e barras de progresso por núcleo ao executar com paralelização. Há um pequeno hickup e, às vezes, o restante dos núcleos aparece de uma só vez, mas mesmo assim acho útil, pois você obtém as estatísticas de progresso por núcleo (ele / s e total de registros, por ex)

insira a descrição da imagem aqui

Obrigado @abcdaa por esta ótima biblioteca!

mork
fonte
1
Obrigado @mork - fique à vontade para adicionar ao github.com/tqdm/tqdm/wiki/How-to-make-a-great-Progress-Bar ou crie uma nova página em github.com/tqdm/tqdm/wiki
casper. dcl 21/01
Obrigado, mas tive que alterar esta parte: try: splits = np.array_split(df[subset], njobs) except ValueError: splits = np.array_split(df, njobs)devido à exceção KeyError em vez de ValueError, altere para Exception para lidar com todos os casos.
Marius
Obrigado @mork - esta resposta deve ser maior.
Andy
5

Você pode fazer isso facilmente com um decorador

from functools import wraps 

def logging_decorator(func):

    @wraps
    def wrapper(*args, **kwargs):
        wrapper.count += 1
        print "The function I modify has been called {0} times(s).".format(
              wrapper.count)
        func(*args, **kwargs)
    wrapper.count = 0
    return wrapper

modified_function = logging_decorator(feature_rollup)

basta usar a função modified_ (e altere quando quiser imprimir)

Jeff
fonte
1
Um aviso óbvio é que isso diminuirá sua função! Você pode até atualizá-lo com o progresso stackoverflow.com/questions/5426546/…, por exemplo, count / len como porcentagem.
Andy Hayden
sim - você terá de ordem (número de grupos), então dependendo do que o gargalo é isso pode fazer a diferença
Jeff
talvez a coisa mais intuitiva a fazer seja agrupar isso em uma logged_apply(g, func)função, na qual você tenha acesso à ordem e possa registrar desde o início.
Andy Hayden
Fiz o acima na minha resposta, também atualização percentual atrevida. Na verdade, não consegui fazer o seu funcionar ... acho que com a parte de baixo. Se você usá-lo para a aplicação, não é tão importante assim.
Andy Hayden
1

Eu mudei a resposta de Jeff , para incluir um total, para que você possa acompanhar o progresso e uma variável para apenas imprimir cada X iterações (isso realmente melhora o desempenho por um monte, se o "print_at" é razoavelmente elevado)

def count_wrapper(func,total, print_at):

    def wrapper(*args):
        wrapper.count += 1
        if wrapper.count % wrapper.print_at == 0:
            clear_output()
            sys.stdout.write( "%d / %d"%(calc_time.count,calc_time.total) )
            sys.stdout.flush()
        return func(*args)
    wrapper.count = 0
    wrapper.total = total
    wrapper.print_at = print_at

    return wrapper

a função clear_output () é de

from IPython.core.display import clear_output

se não no IPython, a resposta de Andy Hayden faz isso sem ele

Filipe Silva
fonte