Estou lidando com um DataFrame do Pandas bastante grande - meu conjunto de dados se parece com a seguinte df
configuração:
import pandas as pd
import numpy as np
#--------------------------------------------- SIZING PARAMETERS :
R1 = 20 # .repeat( repeats = R1 )
R2 = 10 # .repeat( repeats = R2 )
R3 = 541680 # .repeat( repeats = [ R3, R4 ] )
R4 = 576720 # .repeat( repeats = [ R3, R4 ] )
T = 55920 # .tile( , T)
A1 = np.arange( 0, 2708400, 100 ) # ~ 20x re-used
A2 = np.arange( 0, 2883600, 100 ) # ~ 20x re-used
#--------------------------------------------- DataFrame GENERATION :
df = pd.DataFrame.from_dict(
{ 'measurement_id': np.repeat( [0, 1], repeats = [ R3, R4 ] ),
'time':np.concatenate( [ np.repeat( A1, repeats = R1 ),
np.repeat( A2, repeats = R1 ) ] ),
'group': np.tile( np.repeat( [0, 1], repeats = R2 ), T ),
'object': np.tile( np.arange( 0, R1 ), T )
}
)
#--------------------------------------------- DataFrame RE-PROCESSING :
df = pd.concat( [ df,
df \
.groupby( ['measurement_id', 'time', 'group'] ) \
.apply( lambda x: np.random.uniform( 0, 100, 10 ) ) \
.explode() \
.astype( 'float' ) \
.to_frame( 'var' ) \
.reset_index( drop = True )
], axis = 1
)
Nota: Com o objetivo de ter um exemplo mínimo, ele pode ser facilmente subconjunto (por exemplo, com df.loc[df['time'] <= 400, :]
), mas como eu simulo os dados de qualquer maneira, pensei que o tamanho original daria uma visão geral melhor.
Para cada grupo definido por ['measurement_id', 'time', 'group']
, preciso chamar a seguinte função:
from sklearn.cluster import SpectralClustering
from pandarallel import pandarallel
def cluster( x, index ):
if len( x ) >= 2:
data = np.asarray( x )[:, np.newaxis]
clustering = SpectralClustering( n_clusters = 5,
random_state = 42
).fit( data )
return pd.Series( clustering.labels_ + 1, index = index )
else:
return pd.Series( np.nan, index = index )
Para melhorar o desempenho, tentei duas abordagens:
Pacote Pandarallel
A primeira abordagem foi paralelizar os cálculos usando o pandarallel
pacote:
pandarallel.initialize( progress_bar = True )
df \
.groupby( ['measurement_id', 'time', 'group'] ) \
.parallel_apply( lambda x: cluster( x['var'], x['object'] ) )
No entanto, isso parece subótimo, pois consome muita RAM e nem todos os núcleos são usados nos cálculos (mesmo apesar de especificar explicitamente o número de núcleos no pandarallel.initialize()
método). Além disso, algumas vezes os cálculos são encerrados com vários erros, embora eu não tenha tido a chance de encontrar uma razão para isso (possivelmente falta de RAM?).
PySpark Pandas UDF
Também experimentei um UDF do Spark Pandas, embora eu seja totalmente novo no Spark. Aqui está a minha tentativa:
import findspark; findspark.init()
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
spark = SparkSession.builder.master( "local" ).appName( "test" ).config( conf = SparkConf() ).getOrCreate()
df = spark.createDataFrame( df )
@pandas_udf( StructType( [StructField( 'id', IntegerType(), True )] ), functionType = PandasUDFType.GROUPED_MAP )
def cluster( df ):
if len( df['var'] ) >= 2:
data = np.asarray( df['var'] )[:, np.newaxis]
clustering = SpectralClustering( n_clusters = 5,
random_state = 42
).fit( data )
return pd.DataFrame( clustering.labels_ + 1,
index = df['object']
)
else:
return pd.DataFrame( np.nan,
index = df['object']
)
res = df \
.groupBy( ['id_half', 'frame', 'team_id'] ) \
.apply( cluster ) \
.toPandas()
Infelizmente, o desempenho também foi insatisfatório e, pelo que li sobre o tópico, isso pode ser apenas o ônus de usar a função UDF, escrita em Python, e a necessidade associada de converter todos os objetos Python em objetos Spark e vice-versa.
Então, aqui estão as minhas questões:
- Qualquer uma das minhas abordagens pode ser ajustada para eliminar possíveis gargalos e melhorar o desempenho? (por exemplo, configuração do PySpark, ajuste de operações abaixo do ideal etc.)
- Eles são melhores alternativas? Como eles se comparam às soluções fornecidas em termos de desempenho?
dask
(((então meu comentário é apenas um conselho para pesquisa.Respostas:
+1
por mencionar os custos indiretos adicionais da instalação para qualquer estratégia de computação. Isso sempre faz um ponto de equilíbrio, somente após o qual uma não-[SERIAL]
estratégia pode alcançar qualquer alegria benéfica de alguma[TIME]
aceleração de Domínio desejada (ainda que, se houver, normalmente os[SPACE]
custos de Domínio permitam ou permanecem viáveis - sim, RAM. (existência e acesso a um dispositivo, orçamento e outras restrições semelhantes do mundo real)Primeiro,
a verificação antes do voo, antes da decolagem
A nova formulação rígida da Lei da Amdahl atualmente pode incorporar essas duas
pSO + pTO
despesas gerais adicionais e reflete-as na previsão dos níveis possíveis de aceleração, incluindo o ponto de equilíbrio ponto, desde o qual pode se tornar significativo (no sentido de custos / efeito, eficiência) ficar paralelo.No entanto,
esse não é o nosso principal problema aqui .
Isto vem a seguir:
Em seguida,
dados os custos computacionais de
SpectralClustering()
, que aqui é usado para usar o kernel da Função Radial Boltzmann~ exp( -gamma * distance( data, data )**2 )
, parece não haver avanço da divisão dodata
-objeto sobre qualquer número de unidades de trabalho disjuntas, pois odistance( data, data )
componente -com, por definição, tem apenas que visite todos osdata
elementos (ref. os custos de comunicação de{ process | node }
topologias distribuídas com valor a qualquer valor são, por razões óbvias, muito ruins se não os piores casos de uso para{ process | node }
processamento distribuído, se não os antipadrões diretos (exceto para alguns tecidos de fato misteriosos, sem memória / sem estado, e ainda computando).Para analistas pedantes, sim - acrescente a isso (e já podemos dizer um estado ruim ) os custos de - novamente - qualquer processo de k-meios qualquer , aqui é sobre
O( N^( 1 + 5 * 5 ) )
isso, poisN ~ len( data ) ~ 1.12E6+
, contra o nosso desejo de ter algum processamento inteligente e rápido.E daí?
Embora os custos de instalação não sejam negligenciados, o aumento dos custos de comunicação quase certamente desabilitará qualquer melhoria ao usar as tentativas esboçadas acima para passar de um
[SERIAL]
fluxo de processo puro para alguma forma de orquestração justa -[CONCURRENT]
ou verdadeira[PARALLEL]
de algumas subunidades de trabalho , devido ao aumento das despesas gerais relacionadas a uma obrigação de implementar (um par tandem) de topologias de passagem de valor para qualquer valor.Se não fosse por eles?
Bem, isso soa como um oxímoro da Ciência da Computação - mesmo que fosse possível, os custos das distâncias pré-calculadas de qualquer para qualquer (o que levaria esses imensos
[TIME]
custos de complexidade do domínio "de antemão" (Onde? Como? outra latência inevitável, permitindo uma possível ocultação de latência por parte de um (incremento desconhecido até agora) de uma matriz de distância completa a qualquer momento?)) reposicionaria esses custos principalmente presentes em algum outro local[TIME]
- e -[SPACE]
Domínios, não os reduza.A única, que eu estou fora ciente até agora, é tentar, se o problema é possível obter re-formulado em outro, um QUBO-formulado, moda problema (ref .: Q uantum- U nconstrained- B inary- O ptimisation , a boa notícia é que as ferramentas para isso, uma base de conhecimento em primeira mão e uma experiência prática de solução de problemas existem e aumentam)
O desempenho é de tirar o fôlego - o problema formulado pelo QUBO tem um
O(1)
solucionador promissor (!) Em tempo constante (em[TIME]
-Domain) e um pouco restrito em[SPACE]
-Domain (onde os truques LLNL anunciados recentemente podem ajudar a evitar esse mundo físico, a implementação atual da QPU, a restrição de problemas tamanhos).fonte
Esta não é uma resposta, mas ...
Se você correr
(ou seja, apenas com o Pandas), você notará que já está usando vários núcleos. Isso ocorre porque
sklearn
usajoblib
por padrão para paralelizar o trabalho. Você pode trocar o agendador em favor do Dask e talvez obter mais eficiência ao compartilhar os dados entre os threads, mas enquanto o trabalho que você estiver fazendo estiver vinculado à CPU assim, não haverá nada que você possa fazer para acelerar isso.Em resumo, este é um problema de algoritmo: descubra o que você realmente precisa calcular, antes de tentar considerar diferentes estruturas para calculá-lo.
fonte
joblib
criados , que nada têm a ver com encadeamentos, menos com o compartilhamento? Obrigado pelo seu esclarecimento gentil dos argumentos.Não sou especialista
Dask
, mas forneço o seguinte código como linha de base:fonte