Desempenho do Spark para Scala vs Python

178

Eu prefiro Python a Scala. Mas, como o Spark é originalmente escrito em Scala, eu esperava que meu código fosse executado mais rapidamente no Scala que na versão Python por razões óbvias.

Com essa suposição, pensei em aprender e escrever a versão Scala de algum código de pré-processamento muito comum para cerca de 1 GB de dados. Os dados são coletados da competição SpringLeaf no Kaggle . Apenas para fornecer uma visão geral dos dados (contém dimensões de 1936 e 145232 linhas). Os dados são compostos de vários tipos, por exemplo, int, float, string, boolean. Estou usando 6 núcleos de 8 para processamento Spark; é por isso que usei minPartitions=6para que todo núcleo tenha algo a processar.

Código Scala

val input = sc.textFile("train.csv", minPartitions=6)

val input2 = input.mapPartitionsWithIndex { (idx, iter) => 
  if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"

def separateCols(line: String): Array[String] = {
  val line2 = line.replaceAll("true", "1")
  val line3 = line2.replaceAll("false", "0")
  val vals: Array[String] = line3.split(",")

  for((x,i) <- vals.view.zipWithIndex) {
    vals(i) = "VAR_%04d".format(i) + delim1 + x
  }
  vals
}

val input3 = input2.flatMap(separateCols)

def toKeyVal(line: String): (String, String) = {
  val vals = line.split(delim1)
  (vals(0), vals(1))
}

val input4 = input3.map(toKeyVal)

def valsConcat(val1: String, val2: String): String = {
  val1 + "," + val2
}

val input5 = input4.reduceByKey(valsConcat)

input5.saveAsTextFile("output")

Código Python

input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'


def drop_first_line(index, itr):
  if index == 0:
    return iter(list(itr)[1:])
  else:
    return itr

input2 = input.mapPartitionsWithIndex(drop_first_line)

def separate_cols(line):
  line = line.replace('true', '1').replace('false', '0')
  vals = line.split(',')
  vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
           for e, val in enumerate(vals)]
  return vals2


input3 = input2.flatMap(separate_cols)

def to_key_val(kv):
  key, val = kv.split(DELIM_1)
  return (key, val)
input4 = input3.map(to_key_val)

def vals_concat(v1, v2):
  return v1 + ',' + v2

input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')

Scala Performance: Estágio 0 (38 mins), Estágio 1 (18 seg) insira a descrição da imagem aqui

Python Performance Stage 0 (11 mins), Stage 1 (7 seg) insira a descrição da imagem aqui

Ambos produzem diferentes gráficos de visualização do DAG (devido ao qual as duas imagens mostram diferentes funções do estágio 0 para Scala ( map) e Python ( reduceByKey))

Mas, essencialmente, ambos os códigos tentam transformar dados em (dimension_id, sequência de lista de valores) RDD e salvar em disco. A saída será usada para calcular várias estatísticas para cada dimensão.

Em termos de desempenho, o código Scala para dados reais como esse parece funcionar 4 vezes mais lento que a versão Python. Boas notícias para mim é que isso me deu uma boa motivação para permanecer no Python. A má notícia é que não entendi direito o porquê.

Mrityunjay
fonte
8
Talvez este é o código e dependente como eu chegar a outro resultado, que a aplicação apache python faísca é mais lento do scala, quando soma um bilhão termos da fórmula Leibniz para π
Paul
3
Pergunta interessante! Aliás, também dê uma olhada aqui: emptypipes.org/2015/01/17/python-vs-scala-vs-spark Quanto mais núcleos você tiver, menos poderá ver diferenças entre os idiomas.
Markon
Você já pensou em aceitar a resposta existente?
10465355 diz Reinstate Monica

Respostas:

357

A resposta original discutindo o código pode ser encontrada abaixo.


Primeiro de tudo, você precisa distinguir entre diferentes tipos de API, cada um com suas próprias considerações de desempenho.

API RDD

(estruturas Python puras com orquestração baseada em JVM)

Este é o componente que será mais afetado pelo desempenho do código Python e pelos detalhes da implementação do PySpark. Embora seja improvável que o desempenho do Python seja um problema, há pelo menos alguns fatores que você deve considerar:

  • Sobrecarga de comunicação JVM. Praticamente todos os dados que chegam e saem do executor Python precisam ser passados ​​por um soquete e um trabalhador da JVM. Embora seja uma comunicação local relativamente eficiente, ainda não é gratuita.
  • Executores baseados em processo (Python) versus executores baseados em encadeamento (single JVM multiple threads) (Scala). Cada executor Python é executado em seu próprio processo. Como efeito colateral, ele fornece um isolamento mais forte que o equivalente da JVM e algum controle sobre o ciclo de vida do executor, mas potencialmente significativamente maior uso de memória:

    • pegada de memória do intérprete
    • pegada das bibliotecas carregadas
    • transmissão menos eficiente (cada processo requer sua própria cópia de uma transmissão)
  • Desempenho do próprio código Python. De um modo geral, o Scala é mais rápido que o Python, mas varia de tarefa para tarefa. Além disso, você tem várias opções, incluindo JITs como Numba , extensões C ( Cython ) ou bibliotecas especializadas como Theano . Finalmente, se você não usar o ML / MLlib (ou simplesmente a pilha NumPy) , considere usar o PyPy como um intérprete alternativo. Veja SPARK-3094 .

  • A configuração do PySpark fornece a spark.python.worker.reuseopção que pode ser usada para escolher entre dividir o processo do Python para cada tarefa e reutilizar o processo existente. A última opção parece útil para evitar a coleta cara de lixo (é mais uma impressão do que o resultado de testes sistemáticos), enquanto a primeira (padrão) é ideal para o caso de transmissões e importações caras.
  • A contagem de referência, usada como o método de coleta de lixo de primeira linha no CPython, funciona muito bem com cargas de trabalho típicas do Spark (processamento semelhante ao fluxo, sem ciclos de referência) e reduz o risco de longas pausas no GC.

MLlib

(execução mista de Python e JVM)

As considerações básicas são praticamente as mesmas de antes, com alguns problemas adicionais. Enquanto as estruturas básicas usadas com o MLlib são objetos simples do Python RDD, todos os algoritmos são executados diretamente usando o Scala.

Isso significa um custo adicional de converter objetos Python em objetos Scala e vice-versa, aumento do uso de memória e algumas limitações adicionais que abordaremos mais adiante.

A partir de agora (Spark 2.x), a API baseada em RDD está em modo de manutenção e está programada para ser removida no Spark 3.0 .

API DataFrame e Spark ML

(Execução da JVM com código Python limitado ao driver)

Essa é provavelmente a melhor opção para tarefas padrão de processamento de dados. Como o código Python é limitado principalmente a operações lógicas de alto nível no driver, não deve haver diferença de desempenho entre o Python e o Scala.

Uma única exceção é o uso de UDFs Python em linhas, que são significativamente menos eficientes que seus equivalentes Scala. Embora exista alguma chance de melhorias (houve um desenvolvimento substancial no Spark 2.0.0), a maior limitação é a ida e volta completa entre a representação interna (JVM) e o interpretador Python. Se possível, você deve favorecer uma composição de expressões internas ( exemplo . O comportamento UDF do Python foi aprimorado no Spark 2.0.0, mas ainda é subótimo em comparação à execução nativa.

Isso pode melhorar no futuro , melhorou significativamente com a introdução dos UDFs vetorizados (SPARK-21190 e outras extensões) , que usam o Arrow Streaming para troca eficiente de dados com desserialização de cópia zero. Para a maioria dos aplicativos, suas despesas gerais secundárias podem ser simplesmente ignoradas.

Além disso, evite passar dados desnecessários entre DataFramese RDDs. Isso requer serialização e desserialização caras, sem mencionar a transferência de dados de e para o interpretador Python.

Vale a pena notar que as chamadas Py4J têm uma latência bastante alta. Isso inclui chamadas simples como:

from pyspark.sql.functions import col

col("foo")

Normalmente, isso não importa (a sobrecarga é constante e não depende da quantidade de dados), mas no caso de aplicativos em tempo real, você pode considerar o cache / reutilização de wrappers Java.

Conjuntos de dados GraphX ​​e Spark

Por enquanto (Spark 1.6 2.1), nenhum deles fornece a API do PySpark; portanto, você pode dizer que o PySpark é infinitamente pior que o Scala.

GraphX

Na prática, o desenvolvimento do GraphX ​​parou quase completamente e o projeto está atualmente no modo de manutenção com os tickets JIRA relacionados fechados, como não serão corrigidos . A biblioteca GraphFrames fornece uma biblioteca alternativa de processamento de gráficos com ligações Python.

Conjunto de dados

Subjetivamente, não há muito lugar para se digitar estaticamente Datasetsno Python e mesmo que a implementação atual do Scala seja muito simplista e não ofereça os mesmos benefícios de desempenho que DataFrame.

Transmissão

Pelo que vi até agora, recomendo fortemente o uso do Scala sobre Python. Pode mudar no futuro se o PySpark obtiver suporte para fluxos estruturados, mas agora a API Scala parece ser muito mais robusta, abrangente e eficiente. Minha experiência é bastante limitada.

O streaming estruturado no Spark 2.x parece reduzir a diferença entre os idiomas, mas por enquanto ainda está nos seus primeiros dias. No entanto, a API baseada em RDD já é referenciada como "streaming herdado" na documentação do Databricks (data de acesso 2017-03-03)), portanto é razoável esperar mais esforços de unificação.

Considerações de não desempenho

Paridade de recurso

Nem todos os recursos do Spark são expostos pela API do PySpark. Verifique se as peças necessárias já estão implementadas e tente entender as possíveis limitações.

É particularmente importante quando você usa o MLlib e contextos mistos semelhantes (consulte Chamando a Função Java / Scala de uma Tarefa ). Para ser justo, algumas partes da API do PySpark, como mllib.linalg, fornecem um conjunto de métodos mais abrangente que o Scala.

Design da API

A API do PySpark reflete de perto o seu equivalente do Scala e, como tal, não é exatamente Pythonic. Isso significa que é muito fácil mapear entre idiomas, mas, ao mesmo tempo, o código Python pode ser significativamente mais difícil de entender.

Arquitetura complexa

O fluxo de dados do PySpark é relativamente complexo comparado à pura execução da JVM. É muito mais difícil argumentar sobre programas ou depuração do PySpark. Além disso, pelo menos o entendimento básico do Scala e da JVM em geral é praticamente necessário.

Spark 2.xe além

A mudança contínua para a DatasetAPI, com a API RDD congelada, traz oportunidades e desafios para os usuários do Python. Enquanto partes de alto nível da API são muito mais fáceis de expor no Python, os recursos mais avançados são praticamente impossíveis de serem usados diretamente .

Além disso, as funções nativas do Python continuam sendo cidadãos de segunda classe no mundo SQL. Esperamos que isso melhore no futuro com a serialização do Apache Arrow ( os esforços atuais direcionam dados,collection mas o UDF serde é um objetivo a longo prazo ).

Para projetos fortemente dependentes da base de código Python, alternativas puras de Python (como Dask ou Ray ) podem ser uma alternativa interessante.

Não precisa ser um vs. o outro

A API Spark DataFrame (SQL, Dataset) fornece uma maneira elegante de integrar o código Scala / Java no aplicativo PySpark. Você pode usar DataFramespara expor dados a um código JVM nativo e ler novamente os resultados. Eu expliquei algumas opções em outro lugar e você pode encontrar um exemplo prático de ida e volta em Python-Scala em Como usar uma classe Scala no Pyspark .

Ele pode ser aumentado ainda mais com a introdução de tipos definidos pelo usuário (consulte Como definir o esquema para o tipo personalizado no Spark SQL? ).


O que há de errado com o código fornecido na pergunta

(Aviso: ponto de vista Pythonista. Provavelmente perdi alguns truques do Scala)

Primeiro de tudo, há uma parte no seu código que não faz sentido. Se você já possui (key, value)pares criados usando zipWithIndexou enumeratequal é o objetivo de criar uma string apenas para dividi-la logo depois? flatMapnão funciona recursivamente, então você pode simplesmente gerar tuplas e pular as seguintes map.

Outra parte que acho problemática é reduceByKey. De um modo geral, reduceByKeyé útil se a aplicação da função agregada puder reduzir a quantidade de dados que precisam ser embaralhados. Como você simplesmente concatena as strings, não há nada a ganhar aqui. Ignorando coisas de baixo nível, como o número de referências, a quantidade de dados que você precisa transferir é exatamente a mesma que para groupByKey.

Normalmente eu não pensaria nisso, mas até onde eu sei, é um gargalo no seu código Scala. A união de cadeias de caracteres na JVM é uma operação bastante cara (consulte, por exemplo: A concatenação de cadeias de caracteres no scala é tão cara quanto em Java? ). Isso significa que algo assim, _.reduceByKey((v1: String, v2: String) => v1 + ',' + v2) equivalente ao input4.reduceByKey(valsConcat)seu código, não é uma boa ideia.

Se você quer evitar groupByKeyque você pode tentar usar aggregateByKeycom StringBuilder. Algo semelhante a isso deve funcionar:

rdd.aggregateByKey(new StringBuilder)(
  (acc, e) => {
    if(!acc.isEmpty) acc.append(",").append(e)
    else acc.append(e)
  },
  (acc1, acc2) => {
    if(acc1.isEmpty | acc2.isEmpty)  acc1.addString(acc2)
    else acc1.append(",").addString(acc2)
  }
)

mas duvido que valha toda a confusão.

Tendo em mente o que foi dito acima, reescrevi seu código da seguinte maneira:

Scala :

val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
  (idx, iter) => if (idx == 0) iter.drop(1) else iter
}

val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
  case ("true", i) => (i, "1")
  case ("false", i) => (i, "0")
  case p => p.swap
})

val result = pairs.groupByKey.map{
  case (k, vals) =>  {
    val valsString = vals.mkString(",")
    s"$k,$valsString"
  }
}

result.saveAsTextFile("scalaout")

Python :

def drop_first_line(index, itr):
    if index == 0:
        return iter(list(itr)[1:])
    else:
        return itr

def separate_cols(line):
    line = line.replace('true', '1').replace('false', '0')
    vals = line.split(',')
    for (i, x) in enumerate(vals):
        yield (i, x)

input = (sc
    .textFile('train.csv', minPartitions=6)
    .mapPartitionsWithIndex(drop_first_line))

pairs = input.flatMap(separate_cols)

result = (pairs
    .groupByKey()
    .map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))

result.saveAsTextFile("pythonout")

Resultados

No local[6]modo (CPU Intel (R) Xeon (E) E3-1245 V2 a 3.40GHz) com 4 GB de memória por executor, é necessário (n = 3):

  • Scala - média: 250.00s, stdev: 12.49
  • Python - médio: 246.66s, stdev: 1.15

Tenho certeza de que a maior parte desse tempo é gasta em embaralhar, serializar, desserializar e outras tarefas secundárias. Apenas por diversão, aqui está um código ingênuo de thread único no Python que executa a mesma tarefa nesta máquina em menos de um minuto:

def go():
    with open("train.csv") as fr:
        lines = [
            line.replace('true', '1').replace('false', '0').split(",")
            for line in fr]
    return zip(*lines[1:])
zero323
fonte
22
Uma das respostas mais claras, abrangentes e úteis que encontrei por um tempo. Obrigado!
etov
Que cara excelente você é!
DennisLi
-4

Extensão às respostas acima -

O Scala prova-se mais rápido em muitos aspectos do que o python, mas existem algumas razões válidas pelas quais o python está se tornando mais popular que o scala, vamos ver alguns deles -

O Python para Apache Spark é muito fácil de aprender e usar. No entanto, essa não é a única razão pela qual o Pyspark é uma escolha melhor que o Scala. Tem mais.

A API Python para Spark pode ser mais lenta no cluster, mas, no final, os cientistas de dados podem fazer muito mais com isso em comparação com o Scala. A complexidade de Scala está ausente. A interface é simples e abrangente.

Falar sobre a legibilidade do código, manutenção e familiaridade com a API do Python para Apache Spark é muito melhor que o Scala.

O Python vem com várias bibliotecas relacionadas ao aprendizado de máquina e processamento de linguagem natural. Isso ajuda na análise de dados e também possui estatísticas muito maduras e testadas pelo tempo. Por exemplo, numpy, pandas, scikit-learn, seaborn e matplotlib.

Nota: A maioria dos cientistas de dados usa uma abordagem híbrida, na qual utiliza o melhor das duas APIs.

Por fim, a comunidade Scala geralmente acaba sendo muito menos útil para os programadores. Isso faz do Python um aprendizado muito valioso. Se você possui experiência suficiente com qualquer linguagem de programação estaticamente tipada como Java, pode parar de se preocupar em não usar o Scala por completo.


fonte