Fazer Pandas DataFrame apply () usar todos os núcleos?

98

Em agosto de 2017, o Pandas DataFame.apply () infelizmente ainda está limitado a trabalhar com um único núcleo, o que significa que uma máquina com vários núcleos vai desperdiçar a maior parte do seu tempo de computação quando você executa df.apply(myfunc, axis=1).

Como você pode usar todos os seus núcleos para executar aplicativos em um dataframe em paralelo?

Roko Mijic
fonte

Respostas:

73

Você pode usar o swifterpacote:

pip install swifter

Ele funciona como um plugin para o pandas, permitindo que você reutilize a applyfunção:

import swifter

def some_function(data):
    return data * 10

data['out'] = data['in'].swifter.apply(some_function)

Ele descobrirá automaticamente a maneira mais eficiente de paralelizar a função, não importa se ela é vetorizada (como no exemplo acima) ou não.

Mais exemplos e uma comparação de desempenho estão disponíveis no GitHub. Observe que o pacote está em desenvolvimento ativo, então a API pode mudar.

Observe também que isso não funcionará automaticamente para colunas de string. Ao usar cordas, Swifter voltará a ser um Pandas “simples” apply, que não será paralelo. Nesse caso, mesmo forçá-lo a usar dasknão criará melhorias de desempenho, e seria melhor apenas dividir o conjunto de dados manualmente e paralelizar usandomultiprocessing .

slhck
fonte
1
Por pura curiosidade, existe uma maneira de limitar o número de núcleos que usa ao fazer a aplicação paralela? Tenho um servidor compartilhado, então se eu pegar todos os 32 núcleos, ninguém ficará feliz.
Maksim Khaitovich,
1
@MaximHaytovich não sei. O Swifter usa dask em segundo plano, então talvez respeite estas configurações: stackoverflow.com/a/40633117/435093 - caso contrário, recomendo abrir um problema no GitHub. O autor é muito sensível.
slhck
@slhck obrigado! Vou cavar um pouco mais. Parece não funcionar no servidor Windows de qualquer maneira - apenas trava sem fazer nada na tarefa de brinquedo
Maksim Khaitovich
você pode me ajudar a responder a isto: - stackoverflow.com/questions/53561794/…
ak3191
2
Para strings, basta adicionar allow_dask_on_strings(enable=True)assim: df.swifter.allow_dask_on_strings(enable=True).apply(some_function) Fonte: github.com/jmcarpenter2/swifter/issues/45
Sumit Sidana
99

A maneira mais simples é usar map_partitions do Dask . Você precisa dessas importações (você precisará pip install dask):

import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import get

e a sintaxe é

data = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)

def myfunc(x,y,z, ...): return <whatever>

res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(get=get)  

(Eu acredito que 30 é um número adequado de partições se você tiver 16 núcleos). Apenas para completar, eu cronometrei a diferença na minha máquina (16 núcleos):

data = pd.DataFrame()
data['col1'] = np.random.normal(size = 1500000)
data['col2'] = np.random.normal(size = 1500000)

ddata = dd.from_pandas(data, npartitions=30)
def myfunc(x,y): return y*(x**2+1)
def apply_myfunc_to_DF(df): return df.apply((lambda row: myfunc(*row)), axis=1)
def pandas_apply(): return apply_myfunc_to_DF(data)
def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(get=get)  
def vectorized(): return myfunc(data['col1'], data['col2']  )

t_pds = timeit.Timer(lambda: pandas_apply())
print(t_pds.timeit(number=1))

28.16970546543598

t_dsk = timeit.Timer(lambda: dask_apply())
print(t_dsk.timeit(number=1))

2.708152851089835

t_vec = timeit.Timer(lambda: vectorized())
print(t_vec.timeit(number=1))

0,010668013244867325

Dando um fator de 10 speedup indo de pandas aplicar para dask aplicar em partições. Claro, se você tem uma função que pode vetorizar, você deve - neste caso, a função ( y*(x**2+1)) é trivialmente vetorizada, mas há muitas coisas que são impossíveis de vetorizar.

Roko Mijic
fonte
2
Bom saber, obrigado por postar. Você pode explicar por que escolheu 30 partições? O desempenho muda ao alterar este valor?
Andrew L
2
@AndrewL Presumo que cada partição seja servida por um processo separado e, com 16 núcleos, presumo que 16 ou 32 processos podem ser executados simultaneamente. Eu experimentei e o desempenho parece melhorar em até 32 partições, mas aumentos posteriores não tiveram nenhum efeito benéfico. Suponho que com uma máquina quad-core você desejaria 8 partições, etc. Observe que percebi alguma melhora entre 16 e 32, então acho que você realmente quer 2x $ NUM_PROCESSORS
Roko Mijic
8
A única coisa éThe get= keyword has been deprecated. Please use the scheduler= keyword instead with the name of the desired scheduler like 'threads' or 'processes'
palavras para o contrário
4
Para dask v0.20.0 e em diante, use ddata.map_partitions (lambda df: df.apply ((lambda row: myfunc (* row)), axis = 1)). Compute (scheduler = 'process'), ou um dos outras opções do planejador. O código atual exibe "TypeError: A palavra-chave get = foi removida. Use a palavra-chave scheduler = em vez do nome do planejador desejado como 'threads' ou 'processos'"
mork
1
Antes de fazer isso, certifique-se de que o dataframe não tenha índices duplicados à medida que é lançado ValueError: cannot reindex from a duplicate axis. Para contornar isso, você deve remover índices duplicados por df = df[~df.index.duplicated()]ou redefinir seus índices por df.reset_index(inplace=True).
Habib Karbasian
23

você pode tentar em pandarallelvez disso: Uma ferramenta simples e eficiente para paralelizar suas operações do pandas em todas as suas CPUs (no Linux e macOS)

  • A paralelização tem um custo (instanciar novos processos, enviar dados via memória compartilhada, etc ...), então a paralelização é eficiente apenas se a quantidade de cálculo para paralelizar for alta o suficiente. Por muito pouca quantidade de dados, usar paralelização nem sempre vale a pena.
  • As funções aplicadas NÃO devem ser funções lambda.
from pandarallel import pandarallel
from math import sin

pandarallel.initialize()

# FORBIDDEN
df.parallel_apply(lambda x: sin(x**2), axis=1)

# ALLOWED
def func(x):
    return sin(x**2)

df.parallel_apply(func, axis=1)

consulte https://github.com/nalepae/pandarallel

G_KOBELIEF
fonte
Olá, não consigo resolver um problema, usando pandarallel há um erro: AttributeError: Não é possível selecionar o objeto local 'prepare_worker. <locals> .closure. <locals> .wrapper'. Você pode me ajudar com isso?
Alex Cam
@Alex Sry Não sou o desenvolvedor desse módulo. Como são seus códigos? Você pode tentar declarar suas "funções internas" como globais? (acho)
G_KOBELIEF
@AlexCam Sua função deve ser definida fora de outra função para que o python possa selecioná-la para multiprocessamento
Kenan
@G_KOBELIEF Com Python> 3.6, podemos usar a função lambda com pandaparallel
user110244
16

Se você deseja permanecer no python nativo:

import multiprocessing as mp

with mp.Pool(mp.cpu_count()) as pool:
    df['newcol'] = pool.map(f, df['col'])

aplicará a função fem paralelo à coluna coldo dataframedf

Olivier Cruchant
fonte
Seguindo uma abordagem como essa, recebi um ValueError: Length of values does not match length of indexde __setitem__in pandas/core/frame.py. Não tenho certeza se fiz algo errado ou se atribuir a df['newcol']não é thread-safe.
Rattle de
2
Você pode gravar o pool.map em uma lista intermediária temp_result para permitir a verificação se o comprimento corresponde ao df e, em seguida, fazer um df ['newcol'] = temp_result?
Olivier Cruchant de
você quer dizer criar a nova coluna? o que você usaria?
Olivier Cruchant,
sim, atribuindo o resultado do mapa à nova coluna do dataframe. O map não retorna uma lista do resultado de cada trecho enviado para a função f? Então, o que acontece quando você atribui isso à coluna 'newcol? Usando Pandas e Python 3
Mina
Na verdade, funciona muito bem! Você experimentou? Ele cria uma lista do mesmo comprimento do df, mesma ordem que o que foi enviado. Ele literalmente faz c2 = f (c1) de forma paralela. Não há maneira mais simples de multiprocessar em python. Em termos de desempenho, parece que Ray pode fazer coisas boas também (para a datatascience.com/… ), mas não é tão maduro e a instalação nem sempre ocorre sem problemas na minha experiência
Olivier Cruchant
1

Aqui está um exemplo de transformador de base sklearn, no qual os pandas se aplicam é paralelizado

import multiprocessing as mp
from sklearn.base import TransformerMixin, BaseEstimator

class ParllelTransformer(BaseEstimator, TransformerMixin):
    def __init__(self,
                 n_jobs=1):
        """
        n_jobs - parallel jobs to run
        """
        self.variety = variety
        self.user_abbrevs = user_abbrevs
        self.n_jobs = n_jobs
    def fit(self, X, y=None):
        return self
    def transform(self, X, *_):
        X_copy = X.copy()
        cores = mp.cpu_count()
        partitions = 1

        if self.n_jobs <= -1:
            partitions = cores
        elif self.n_jobs <= 0:
            partitions = 1
        else:
            partitions = min(self.n_jobs, cores)

        if partitions == 1:
            # transform sequentially
            return X_copy.apply(self._transform_one)

        # splitting data into batches
        data_split = np.array_split(X_copy, partitions)

        pool = mp.Pool(cores)

        # Here reduce function - concationation of transformed batches
        data = pd.concat(
            pool.map(self._preprocess_part, data_split)
        )

        pool.close()
        pool.join()
        return data
    def _transform_part(self, df_part):
        return df_part.apply(self._transform_one)
    def _transform_one(self, line):
        # some kind of transformations here
        return line

para obter mais informações, consulte https://towardsdatascience.com/4-easy-steps-to-improve-your-machine-learning-code-performance-88a0b0eeffa8

Maxim Balatsko
fonte