Então, digamos que você tenha um processo Python que colete dados em tempo real com cerca de 500 linhas por segundo (isso pode ser paralelizado ainda mais para reduzir para cerca de 50 ps) de um sistema de filas e anexá-lo a DataFrame
:
rq = MyRedisQueue(..)
df = pd.DataFrame()
while 1:
recv = rq.get(block=True)
# some converting
df.append(recv, ignore_index = True)
Agora, a pergunta é: como utilizar as CPUs com base nesses dados? Portanto, estou plenamente consciente das limitações do GIL , e olhou para multiprocessamento Gerente de namespace , aqui também , mas parece que existem algumas desvantagens em relação à latência na trama de dados centerally espera . Antes de escavar para ele, eu também tentou pool.map
que eu que reconheceu a aplicar pickle
entre os processos, que é maneira de retardar e tem muita sobrecarga.
Então, depois de tudo isso, eu finalmente me pergunto, como (se) uma inserção de 500 linhas por segundo (ou mesmo 50 linhas por segundo) pode ser transferida para diferentes processos com algum tempo de CPU restante para aplicar estatísticas e heurísticas nos dados na criança processos?
Talvez seja melhor implementar um soquete tcp personalizado ou sistema de filas entre os dois processos? Ou existem algumas implementações pandas
ou outras bibliotecas para realmente permitir um acesso rápido ao único grande quadro de dados no processo pai ? Eu amo pandas!
Respostas:
Antes de começarmos, devo dizer que você não nos contou muito sobre seu código, mas tem esse objetivo de transferir apenas 50/500 novas linhas por segundo para o processo filho e tentar criar esse grande
DataFrame
processo filho.Estou trabalhando em um projeto exatamente igual ao seu. O Python possui muitas implementações de IPC, como
Pipe
eQueue
como você sabe.Shared Memory
solução pode ser problemática em muitos casos, a documentação oficial do AFAIK python alertou sobre o uso de memórias compartilhadas.Na minha experiência, a melhor maneira de transformar dados entre apenas dois processos é
Pipe
, para que você possa selecionar o DataFrame e enviá-lo para o outro ponto final da conexão. Eu sugiro fortemente que você eviteTCP
sockets (AF_INET
) no seu caso.Os pandas
DataFrame
não podem ser transformados em outro processo sem serem conservados em conserva. por isso, recomendo que você transfira os dados brutos como tiposdict
internos, como no lugar do DataFrame. Isso pode tornar a pickle e a remoção mais rápidas e também possui menos pegadas de memória.fonte
Shared Memory
área - que, esperançosamente, pode lidar com muitos processos de leitura dos processos filhos, enquanto os principais processos estão anexados a ele - poderia fazê-lo pelo que vejo, se dificilmente restringir os acessos de gravação ao processo pai.shared memory
está em algum tipo deblock state
escrita para ele? Isso significaria que os processos filho não podem ler o DataFrame, enquanto o processo pai é anexado a ele (o que quase sempre será).Shared Memory
solução, sincronize os processos filhos com o processo do fornecedor. Isso poderia ser feito pormultiprocessing.Lock
: docs.python.org/3/library/…Paralelização em
pandas
é provavelmente melhor tratada por outro mecanismo.Dê uma olhada no projeto Koalas da Databricks ou do DataFrame da Dask .
fonte
df.map_partitions
) e, em seguida,groupby
indexe (importante para o desempenho no Dask), salve como CSV.Uma solução simples seria separar o processo em dois estágios diferentes. Use o Asyncio para receber os dados de maneira não-bloqueante e executar suas transformações dentro disso. O segundo estágio consumiria uma fila assíncrona para criar o DataFrame. Isso pressupõe que você não precisa do DataFrame disponível para um processo diferente enquanto estiver recebendo dados da Fila Redis.
Aqui está um exemplo de construção de um modelo de produtor / consumidor com o Asyncio
fonte