Aplicação eficiente de uma função a um DataFrame de pandas agrupados em paralelo

89

Freqüentemente, preciso aplicar uma função aos grupos de um muito grande DataFrame(de tipos de dados mistos) e gostaria de tirar proveito de vários núcleos.

Posso criar um iterador a partir dos grupos e usar o módulo de multiprocessamento, mas não é eficiente porque cada grupo e os resultados da função devem ser separados para envio de mensagens entre processos.

Existe alguma maneira de evitar a decapagem ou mesmo evitar a cópia do DataFramecompletamente? Parece que as funções de memória compartilhada dos módulos de multiprocessamento são limitadas a numpyarrays. Existem outras opções?

user2303
fonte
Pelo que eu sei, não há como compartilhar objetos arbitrários. Fico imaginando se a decapagem leva muito mais tempo do que o ganho com o multiprocessamento. Talvez você deva procurar a possibilidade de criar pacotes de trabalho maiores para cada processo para reduzir o tempo relativo de decapagem. Outra possibilidade seria usar o multiprocessamento ao criar os grupos.
Sebastian Werk
3
Eu faço algo assim, mas usando UWSGI, Flask e pré-bifurcação: carrego o dataframe do pandas em um processo, bifurco x vezes (tornando-o um objeto de memória compartilhada) e, em seguida, chamo esses processos de outro processo Python onde concato os resultados. atm Eu uso JSON como um processo de comunicação, mas isso está chegando (ainda altamente experimental): pandas.pydata.org/pandas-docs/dev/io.html#msgpack-experimental
Carst
A propósito, você já olhou para HDF5 com chunking? (HDF5 não é salvo para gravação simultânea, mas você também pode salvar em arquivos separados e no final concatenar coisas)
Carst
7
isso será direcionado para 0,14, consulte este problema: github.com/pydata/pandas/issues/5751
Jeff
4
@Jeff foi empurrado para 0,15 = (
pyCthon

Respostas:

12

Pelos comentários acima, parece que isso está planejado há pandasalgum tempo (há também um rosettaprojeto de aparência interessante que acabei de notar).

No entanto, até que todas as funcionalidades paralelas sejam incorporadas pandas, percebi que é muito fácil escrever ampliações paralelas eficientes e sem cópia de memória para pandasusar diretamente cython+ OpenMP e C ++.

Aqui está um pequeno exemplo de como escrever um grupo por soma paralelo, cujo uso é algo assim:

import pandas as pd
import para_group_demo

df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)})
print para_group_demo.sum(df.a, df.b)

e a saída é:

     sum
key     
0      6
1      11
2      4

Nota Sem dúvida, a funcionalidade deste exemplo simples fará parte do pandas. Algumas coisas, no entanto, serão mais naturais paralelizar em C ++ por algum tempo, e é importante estar ciente de como é fácil combinar isso em pandas.


Para fazer isso, escrevi uma extensão de arquivo de origem única simples cujo código segue.

Começa com algumas importações e definições de tipo

from libc.stdint cimport int64_t, uint64_t
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map

cimport cython
from cython.operator cimport dereference as deref, preincrement as inc
from cython.parallel import prange

import pandas as pd

ctypedef unordered_map[int64_t, uint64_t] counts_t
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t
ctypedef vector[counts_t] counts_vec_t

O unordered_maptipo C ++ é para somar por um único thread e o vectoré para somar por todos os threads.

Agora para a função sum. Ele começa com visualizações de memória digitadas para acesso rápido:

def sum(crit, vals):
    cdef int64_t[:] crit_view = crit.values
    cdef int64_t[:] vals_view = vals.values

A função continua dividindo o semi-igualmente para os threads (aqui codificados para 4), e fazendo com que cada thread some as entradas em seu intervalo:

    cdef uint64_t num_threads = 4
    cdef uint64_t l = len(crit)
    cdef uint64_t s = l / num_threads + 1
    cdef uint64_t i, j, e
    cdef counts_vec_t counts
    counts = counts_vec_t(num_threads)
    counts.resize(num_threads)
    with cython.boundscheck(False):
        for i in prange(num_threads, nogil=True): 
            j = i * s
            e = j + s
            if e > l:
                e = l
            while j < e:
                counts[i][crit_view[j]] += vals_view[j]
                inc(j)

Quando os threads são concluídos, a função mescla todos os resultados (de diferentes intervalos) em um único unordered_map:

    cdef counts_t total
    cdef counts_it_t it, e_it
    for i in range(num_threads):
        it = counts[i].begin()
        e_it = counts[i].end()
        while it != e_it:
            total[deref(it).first] += deref(it).second
            inc(it)        

Tudo o que resta é criar um DataFramee retornar os resultados:

    key, sum_ = [], []
    it = total.begin()
    e_it = total.end()
    while it != e_it:
        key.append(deref(it).first)
        sum_.append(deref(it).second)
        inc(it)

    df = pd.DataFrame({'key': key, 'sum': sum_})
    df.set_index('key', inplace=True)
    return df
Ami Tavory
fonte