Existe uma maneira simples de executar pandas.DataFrame.isin em paralelo?

25

Eu tenho um programa de modelagem e pontuação que faz uso pesado da DataFrame.isinfunção dos pandas, pesquisando nas listas de registros "like" do Facebook de usuários individuais para cada um dos milhares de páginas específicas. Essa é a parte que consome mais tempo do programa, mais do que as peças de modelagem ou pontuação, simplesmente porque é executada apenas em um núcleo, enquanto o restante é executado em algumas dezenas simultaneamente.

Embora eu saiba que poderia dividir manualmente o quadro de dados em partes e executar a operação em paralelo, existe alguma maneira direta de fazer isso automaticamente? Em outras palavras, existe algum tipo de pacote que reconheça que eu estou executando uma operação facilmente delegada e a distribua automaticamente? Talvez isso seja pedir demais, mas eu já fiquei bastante surpreso com o que já está disponível no Python, então acho que vale a pena perguntar.

Qualquer outra sugestão sobre como isso pode ser feito (mesmo que não seja por algum pacote mágico de unicórnio!) Também seria apreciada. Principalmente, apenas tentando encontrar uma maneira de economizar 15 a 20 minutos por execução sem gastar uma quantidade igual de tempo codificando a solução.

Therriault
fonte
Qual é o tamanho da sua lista de valores? Você já tentou passar isso como um conjunto? Para paralelismo, você pode estar interessado no Joblib. É fácil de usar e pode acelerar os cálculos. Use-o com grandes pedaços de dados.
oao
Outra opção é reformular o seu problema como uma associação. As associações são muito mais rápidas no Pandas stackoverflow.com/questions/23945493/…
Brian Spiering 20/01
Ainda outra opção é usar o np.in1d, que também é mais rápido stackoverflow.com/questions/21738882/fast-pandas-filtering
Brian Spiering

Respostas:

8

Infelizmente, a paralelização ainda não está implementada nos pandas. Você pode ingressar nesta questão do github se quiser participar do desenvolvimento desse recurso.

Como não conheço nenhum "pacote unicórnio mágico" para esse propósito, a melhor coisa será escrever sua própria solução. Mas se você ainda não quer gastar tempo com isso e quer aprender algo novo - pode experimentar os dois métodos incorporados ao MongoDB (estrutura de redução e agg de mapa). Consulte mongodb_agg_framework .

Stanpol
fonte
6

Eu acho que sua melhor aposta seria rosetta . Estou achando extremamente útil e fácil. Verifique seus métodos de pandas .

Você pode obtê-lo por pip .

dmvianna
fonte
Eu recomendo obter rosetta indo diretamente para o GitHub. Isso garante que você obtenha a versão mais recente. github.com/columbia-applied-data-science/rosetta
Ian Langmore
0

Existe uma versão mais comum dessa pergunta em relação à paralelização em função de aplicação de pandas - portanto, essa é uma pergunta refrescante :)

Primeiro , quero mencionar mais rapidamente, já que você pediu uma solução "empacotada", e ela aparece na maioria das questões relacionadas à paralelização de pandas.

Mas .. ainda gostaria de compartilhar meu código de essência pessoal, pois depois de vários anos trabalhando com o DataFrame, nunca encontrei uma solução 100% de paralelização (principalmente para a função de aplicação) e sempre tive que voltar para o meu " código manual ".

Graças a você, tornei mais genérico o suporte a qualquer método DataFrame (teoricamente) por seu nome (para que você não precise manter as versões de isin, apply, etc.).

Eu testei nas funções "isin", "apply" e "isna" usando o python 2.7 e 3.6. São menos de 20 linhas, e eu segui a convenção de nomes de pandas como "subconjunto" e "njobs".

Eu também adicionei uma comparação de tempo com o código equivalente dask para "isin" e parece ~ X2 vezes mais lento que essa essência.

Inclui 2 funções:

df_multi_core - este é o que você chama. Aceita:

  1. Seu objeto df
  2. O nome da função que você deseja chamar
  3. O subconjunto de colunas em que a função pode ser executada (ajuda a reduzir o tempo / memória)
  4. O número de tarefas a serem executadas em paralelo (-1 ou omitir para todos os núcleos)
  5. Quaisquer outros kwargs aceitos pela função do df (como "eixo")

_df_split - esta é uma função auxiliar interna que deve ser posicionada globalmente para o módulo em execução (Pool.map é "dependente do posicionamento"), caso contrário, eu a localizaria internamente.

aqui está o código da minha essência (vou adicionar mais testes de função de pandas lá):

import pandas as pd
import numpy as np
import multiprocessing
from functools import partial

def _df_split(tup_arg, **kwargs):
    split_ind, df_split, df_f_name = tup_arg
    return (split_ind, getattr(df_split, df_f_name)(**kwargs))

def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
    if njobs == -1:
        njobs = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=njobs)

    try:
        splits = np.array_split(df[subset], njobs)
    except ValueError:
        splits = np.array_split(df, njobs)

    pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
    results = pool.map(partial(_df_split, **kwargs), pool_data)
    pool.close()
    pool.join()
    results = sorted(results, key=lambda x:x[0])
    results = pd.concat([split[1] for split in results])
    return results

Abaixo está um código de teste para um isin paralelo , comparando o desempenho nativo, de múltiplos núcleos e dask. Em uma máquina I7 com 8 núcleos físicos, obtive cerca de 4 vezes mais velocidade. Eu adoraria ouvir o que você obtém em seus dados reais!

from time import time

if __name__ == '__main__': 
    sep = '-' * 50

    # isin test
    N = 10000000
    df = pd.DataFrame({'c1': np.random.randint(low=1, high=N, size=N), 'c2': np.arange(N)})
    lookfor = np.random.randint(low=1, high=N, size=1000000)

    print('{}\ntesting pandas isin on {}\n{}'.format(sep, df.shape, sep))
    t1 = time()
    print('result\n{}'.format(df.isin(lookfor).sum()))
    t2 = time()
    print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))

    t3 = time()
    res = df_multi_core(df=df, df_f_name='isin', subset=['c1'], njobs=-1, values=lookfor)
    print('result\n{}'.format(res.sum()))
    t4 = time()
    print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))


    t5 = time()
    ddata = dd.from_pandas(df, npartitions=njobs)
    res = ddata.map_partitions(lambda df: df.apply(apply_f, axis=1)).compute(scheduler='processes')
    t6 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for dask implementation {}\n{}'.format(round(t6 - t5, 2), sep))

--------------------------------------------------
testing pandas isin on (10000000, 2)
--------------------------------------------------
result
c1    953213
c2    951942
dtype: int64
time for native implementation 3.87
--------------------------------------------------
result
c1    953213
dtype: int64
time for multi core implementation 1.16
--------------------------------------------------
result
c1    953213
c2    951942
dtype: int64
time for dask implementation 2.88
mork
fonte
@Therriault Adicionei uma comparação dask com isin- parece que o snippet de código é mais eficaz com 'isin' - ~ X1.75 vezes mais rápido que o dask (comparado com a applyfunção que ficou 5% mais rápida que o dask)
mork