Eu prefiro Python a Scala. Mas, como o Spark é originalmente escrito em Scala, eu esperava que meu código fosse executado mais rapidamente no Scala que na versão Python por razões óbvias.
Com essa suposição, pensei em aprender e escrever a versão Scala de algum código de pré-processamento muito comum para cerca de 1 GB de dados. Os dados são coletados da competição SpringLeaf no Kaggle . Apenas para fornecer uma visão geral dos dados (contém dimensões de 1936 e 145232 linhas). Os dados são compostos de vários tipos, por exemplo, int, float, string, boolean. Estou usando 6 núcleos de 8 para processamento Spark; é por isso que usei minPartitions=6
para que todo núcleo tenha algo a processar.
Código Scala
val input = sc.textFile("train.csv", minPartitions=6)
val input2 = input.mapPartitionsWithIndex { (idx, iter) =>
if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"
def separateCols(line: String): Array[String] = {
val line2 = line.replaceAll("true", "1")
val line3 = line2.replaceAll("false", "0")
val vals: Array[String] = line3.split(",")
for((x,i) <- vals.view.zipWithIndex) {
vals(i) = "VAR_%04d".format(i) + delim1 + x
}
vals
}
val input3 = input2.flatMap(separateCols)
def toKeyVal(line: String): (String, String) = {
val vals = line.split(delim1)
(vals(0), vals(1))
}
val input4 = input3.map(toKeyVal)
def valsConcat(val1: String, val2: String): String = {
val1 + "," + val2
}
val input5 = input4.reduceByKey(valsConcat)
input5.saveAsTextFile("output")
Código Python
input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'
def drop_first_line(index, itr):
if index == 0:
return iter(list(itr)[1:])
else:
return itr
input2 = input.mapPartitionsWithIndex(drop_first_line)
def separate_cols(line):
line = line.replace('true', '1').replace('false', '0')
vals = line.split(',')
vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
for e, val in enumerate(vals)]
return vals2
input3 = input2.flatMap(separate_cols)
def to_key_val(kv):
key, val = kv.split(DELIM_1)
return (key, val)
input4 = input3.map(to_key_val)
def vals_concat(v1, v2):
return v1 + ',' + v2
input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')
Scala Performance: Estágio 0 (38 mins), Estágio 1 (18 seg)
Python Performance Stage 0 (11 mins), Stage 1 (7 seg)
Ambos produzem diferentes gráficos de visualização do DAG (devido ao qual as duas imagens mostram diferentes funções do estágio 0 para Scala ( map
) e Python ( reduceByKey
))
Mas, essencialmente, ambos os códigos tentam transformar dados em (dimension_id, sequência de lista de valores) RDD e salvar em disco. A saída será usada para calcular várias estatísticas para cada dimensão.
Em termos de desempenho, o código Scala para dados reais como esse parece funcionar 4 vezes mais lento que a versão Python. Boas notícias para mim é que isso me deu uma boa motivação para permanecer no Python. A má notícia é que não entendi direito o porquê.
fonte
Respostas:
A resposta original discutindo o código pode ser encontrada abaixo.
Primeiro de tudo, você precisa distinguir entre diferentes tipos de API, cada um com suas próprias considerações de desempenho.
API RDD
(estruturas Python puras com orquestração baseada em JVM)
Este é o componente que será mais afetado pelo desempenho do código Python e pelos detalhes da implementação do PySpark. Embora seja improvável que o desempenho do Python seja um problema, há pelo menos alguns fatores que você deve considerar:
Executores baseados em processo (Python) versus executores baseados em encadeamento (single JVM multiple threads) (Scala). Cada executor Python é executado em seu próprio processo. Como efeito colateral, ele fornece um isolamento mais forte que o equivalente da JVM e algum controle sobre o ciclo de vida do executor, mas potencialmente significativamente maior uso de memória:
Desempenho do próprio código Python. De um modo geral, o Scala é mais rápido que o Python, mas varia de tarefa para tarefa. Além disso, você tem várias opções, incluindo JITs como Numba , extensões C ( Cython ) ou bibliotecas especializadas como Theano . Finalmente,
se você não usar o ML / MLlib (ou simplesmente a pilha NumPy), considere usar o PyPy como um intérprete alternativo. Veja SPARK-3094 .spark.python.worker.reuse
opção que pode ser usada para escolher entre dividir o processo do Python para cada tarefa e reutilizar o processo existente. A última opção parece útil para evitar a coleta cara de lixo (é mais uma impressão do que o resultado de testes sistemáticos), enquanto a primeira (padrão) é ideal para o caso de transmissões e importações caras.MLlib
(execução mista de Python e JVM)
As considerações básicas são praticamente as mesmas de antes, com alguns problemas adicionais. Enquanto as estruturas básicas usadas com o MLlib são objetos simples do Python RDD, todos os algoritmos são executados diretamente usando o Scala.
Isso significa um custo adicional de converter objetos Python em objetos Scala e vice-versa, aumento do uso de memória e algumas limitações adicionais que abordaremos mais adiante.
A partir de agora (Spark 2.x), a API baseada em RDD está em modo de manutenção e está programada para ser removida no Spark 3.0 .
API DataFrame e Spark ML
(Execução da JVM com código Python limitado ao driver)
Essa é provavelmente a melhor opção para tarefas padrão de processamento de dados. Como o código Python é limitado principalmente a operações lógicas de alto nível no driver, não deve haver diferença de desempenho entre o Python e o Scala.
Uma única exceção é o uso de UDFs Python em linhas, que são significativamente menos eficientes que seus equivalentes Scala. Embora exista alguma chance de melhorias (houve um desenvolvimento substancial no Spark 2.0.0), a maior limitação é a ida e volta completa entre a representação interna (JVM) e o interpretador Python. Se possível, você deve favorecer uma composição de expressões internas ( exemplo . O comportamento UDF do Python foi aprimorado no Spark 2.0.0, mas ainda é subótimo em comparação à execução nativa.
Isso
pode melhorar no futuro, melhorou significativamente com a introdução dos UDFs vetorizados (SPARK-21190 e outras extensões) , que usam o Arrow Streaming para troca eficiente de dados com desserialização de cópia zero. Para a maioria dos aplicativos, suas despesas gerais secundárias podem ser simplesmente ignoradas.Além disso, evite passar dados desnecessários entre
DataFrames
eRDDs
. Isso requer serialização e desserialização caras, sem mencionar a transferência de dados de e para o interpretador Python.Vale a pena notar que as chamadas Py4J têm uma latência bastante alta. Isso inclui chamadas simples como:
Normalmente, isso não importa (a sobrecarga é constante e não depende da quantidade de dados), mas no caso de aplicativos em tempo real, você pode considerar o cache / reutilização de wrappers Java.
Conjuntos de dados GraphX e Spark
Por enquanto (Spark
GraphX1.62.1), nenhum deles fornece a API do PySpark; portanto, você pode dizer que o PySpark é infinitamente pior que o Scala.Na prática, o desenvolvimento do GraphX parou quase completamente e o projeto está atualmente no modo de manutenção com os tickets JIRA relacionados fechados, como não serão corrigidos . A biblioteca GraphFrames fornece uma biblioteca alternativa de processamento de gráficos com ligações Python.
Conjunto de dadosSubjetivamente, não há muito lugar para se digitar estaticamente
Datasets
no Python e mesmo que a implementação atual do Scala seja muito simplista e não ofereça os mesmos benefícios de desempenho queDataFrame
.Transmissão
Pelo que vi até agora, recomendo fortemente o uso do Scala sobre Python. Pode mudar no futuro se o PySpark obtiver suporte para fluxos estruturados, mas agora a API Scala parece ser muito mais robusta, abrangente e eficiente. Minha experiência é bastante limitada.
O streaming estruturado no Spark 2.x parece reduzir a diferença entre os idiomas, mas por enquanto ainda está nos seus primeiros dias. No entanto, a API baseada em RDD já é referenciada como "streaming herdado" na documentação do Databricks (data de acesso 2017-03-03)), portanto é razoável esperar mais esforços de unificação.
Considerações de não desempenho
Paridade de recursoNem todos os recursos do Spark são expostos pela API do PySpark. Verifique se as peças necessárias já estão implementadas e tente entender as possíveis limitações.
É particularmente importante quando você usa o MLlib e contextos mistos semelhantes (consulte Chamando a Função Java / Scala de uma Tarefa ). Para ser justo, algumas partes da API do PySpark, como
Design da APImllib.linalg
, fornecem um conjunto de métodos mais abrangente que o Scala.A API do PySpark reflete de perto o seu equivalente do Scala e, como tal, não é exatamente Pythonic. Isso significa que é muito fácil mapear entre idiomas, mas, ao mesmo tempo, o código Python pode ser significativamente mais difícil de entender.
Arquitetura complexaO fluxo de dados do PySpark é relativamente complexo comparado à pura execução da JVM. É muito mais difícil argumentar sobre programas ou depuração do PySpark. Além disso, pelo menos o entendimento básico do Scala e da JVM em geral é praticamente necessário.
Spark 2.xe alémA mudança contínua para a
Dataset
API, com a API RDD congelada, traz oportunidades e desafios para os usuários do Python. Enquanto partes de alto nível da API são muito mais fáceis de expor no Python, os recursos mais avançados são praticamente impossíveis de serem usados diretamente .Além disso, as funções nativas do Python continuam sendo cidadãos de segunda classe no mundo SQL. Esperamos que isso melhore no futuro com a serialização do Apache Arrow ( os esforços atuais direcionam dados,
collection
mas o UDF serde é um objetivo a longo prazo ).Para projetos fortemente dependentes da base de código Python, alternativas puras de Python (como Dask ou Ray ) podem ser uma alternativa interessante.
Não precisa ser um vs. o outro
A API Spark DataFrame (SQL, Dataset) fornece uma maneira elegante de integrar o código Scala / Java no aplicativo PySpark. Você pode usar
DataFrames
para expor dados a um código JVM nativo e ler novamente os resultados. Eu expliquei algumas opções em outro lugar e você pode encontrar um exemplo prático de ida e volta em Python-Scala em Como usar uma classe Scala no Pyspark .Ele pode ser aumentado ainda mais com a introdução de tipos definidos pelo usuário (consulte Como definir o esquema para o tipo personalizado no Spark SQL? ).
O que há de errado com o código fornecido na pergunta
(Aviso: ponto de vista Pythonista. Provavelmente perdi alguns truques do Scala)
Primeiro de tudo, há uma parte no seu código que não faz sentido. Se você já possui
(key, value)
pares criados usandozipWithIndex
ouenumerate
qual é o objetivo de criar uma string apenas para dividi-la logo depois?flatMap
não funciona recursivamente, então você pode simplesmente gerar tuplas e pular as seguintesmap
.Outra parte que acho problemática é
reduceByKey
. De um modo geral,reduceByKey
é útil se a aplicação da função agregada puder reduzir a quantidade de dados que precisam ser embaralhados. Como você simplesmente concatena as strings, não há nada a ganhar aqui. Ignorando coisas de baixo nível, como o número de referências, a quantidade de dados que você precisa transferir é exatamente a mesma que paragroupByKey
.Normalmente eu não pensaria nisso, mas até onde eu sei, é um gargalo no seu código Scala. A união de cadeias de caracteres na JVM é uma operação bastante cara (consulte, por exemplo: A concatenação de cadeias de caracteres no scala é tão cara quanto em Java? ). Isso significa que algo assim,
_.reduceByKey((v1: String, v2: String) => v1 + ',' + v2)
equivalente aoinput4.reduceByKey(valsConcat)
seu código, não é uma boa ideia.Se você quer evitar
groupByKey
que você pode tentar usaraggregateByKey
comStringBuilder
. Algo semelhante a isso deve funcionar:mas duvido que valha toda a confusão.
Tendo em mente o que foi dito acima, reescrevi seu código da seguinte maneira:
Scala :
Python :
Resultados
No
local[6]
modo (CPU Intel (R) Xeon (E) E3-1245 V2 a 3.40GHz) com 4 GB de memória por executor, é necessário (n = 3):Tenho certeza de que a maior parte desse tempo é gasta em embaralhar, serializar, desserializar e outras tarefas secundárias. Apenas por diversão, aqui está um código ingênuo de thread único no Python que executa a mesma tarefa nesta máquina em menos de um minuto:
fonte
Extensão às respostas acima -
O Scala prova-se mais rápido em muitos aspectos do que o python, mas existem algumas razões válidas pelas quais o python está se tornando mais popular que o scala, vamos ver alguns deles -
O Python para Apache Spark é muito fácil de aprender e usar. No entanto, essa não é a única razão pela qual o Pyspark é uma escolha melhor que o Scala. Tem mais.
A API Python para Spark pode ser mais lenta no cluster, mas, no final, os cientistas de dados podem fazer muito mais com isso em comparação com o Scala. A complexidade de Scala está ausente. A interface é simples e abrangente.
Falar sobre a legibilidade do código, manutenção e familiaridade com a API do Python para Apache Spark é muito melhor que o Scala.
O Python vem com várias bibliotecas relacionadas ao aprendizado de máquina e processamento de linguagem natural. Isso ajuda na análise de dados e também possui estatísticas muito maduras e testadas pelo tempo. Por exemplo, numpy, pandas, scikit-learn, seaborn e matplotlib.
Nota: A maioria dos cientistas de dados usa uma abordagem híbrida, na qual utiliza o melhor das duas APIs.
Por fim, a comunidade Scala geralmente acaba sendo muito menos útil para os programadores. Isso faz do Python um aprendizado muito valioso. Se você possui experiência suficiente com qualquer linguagem de programação estaticamente tipada como Java, pode parar de se preocupar em não usar o Scala por completo.
fonte