Obtendo um comportamento estranho ao chamar a função fora de um fechamento:
- quando a função está em um objeto, tudo está funcionando
- quando a função está em uma classe get:
Tarefa não serializável: java.io.NotSerializableException: testing
O problema é que eu preciso do meu código em uma classe e não em um objeto. Alguma idéia de por que isso está acontecendo? Um objeto Scala é serializado (padrão?)?
Este é um exemplo de código funcional:
object working extends App {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
//calling function outside closure
val after = rddList.map(someFunc(_))
def someFunc(a:Int) = a+1
after.collect().map(println(_))
}
Este é o exemplo não útil:
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
scala
serialization
apache-spark
typesafe
Nimrod007
fonte
fonte
Respostas:
Os RDDs estendem a interface Serialisable , portanto, não é isso que está causando a falha da sua tarefa. Agora, isso não significa que você pode serializar um
RDD
com o Spark e evitarNotSerializableException
O Spark é um mecanismo de computação distribuído e sua abstração principal é um conjunto de dados distribuído resiliente ( RDD ), que pode ser visto como uma coleção distribuída. Basicamente, os elementos do RDD são particionados nos nós do cluster, mas o Spark abstrai isso do usuário, permitindo que ele interaja com o RDD (coleção) como se fosse um local.
Não entrar em muitos detalhes, mas quando você executar transformações diferentes em um RDD (
map
,flatMap
,filter
e outros), o código de transformação (fechamento) é:Obviamente, você pode executá-lo localmente (como no seu exemplo), mas todas essas fases (além do envio pela rede) ainda ocorrem. [Isso permite detectar erros antes de implantar na produção]
O que acontece no seu segundo caso é que você está chamando um método, definido na classe
testing
de dentro da função de mapa. O Spark vê que, e como os métodos não podem ser serializados por conta própria, o Spark tenta serializar toda atesting
classe, para que o código ainda funcione quando executado em outra JVM. Você tem duas possibilidades:Você pode tornar o teste de classe serializável, para que toda a classe possa ser serializada pelo Spark:
ou você cria uma
someFunc
função em vez de um método (funções são objetos no Scala), para que o Spark possa serializá-lo:Semelhante, mas não o mesmo problema com a serialização de classe, pode ser do seu interesse e você pode ler sobre isso nesta apresentação do Spark Summit 2013 .
Como observação lateral, você pode reescrever
rddList.map(someFunc(_))
pararddList.map(someFunc)
, eles são exatamente os mesmos. Geralmente, o segundo é preferido, pois é menos detalhado e limpo de ler.EDIT (2015-03-15): O SPARK-5307 introduziu o SerializationDebugger e o Spark 1.3.0 é a primeira versão a usá-lo. Ele adiciona o caminho de serialização para um NotSerializableException . Quando uma NotSerializableException é encontrada, o depurador visita o gráfico do objeto para encontrar o caminho para o objeto que não pode ser serializado e constrói informações para ajudar o usuário a encontrar o objeto.
No caso do OP, é isso que é impresso no stdout:
fonte
val test = new Test with Serializable
A resposta de Grega é ótima para explicar por que o código original não funciona e duas maneiras de corrigir o problema. No entanto, esta solução não é muito flexível; considere o caso em que seu fechamento inclui uma chamada de método em uma
Serializable
classe que você não tem controle. Você não pode adicionar aSerializable
marca a essa classe nem alterar a implementação subjacente para transformar o método em uma função.Nilesh apresenta uma ótima solução alternativa para isso, mas a solução pode ser feita de forma mais concisa e geral:
Esse serializador de funções pode ser usado para quebrar automaticamente fechamentos e chamadas de método:
Essa técnica também tem o benefício de não exigir as dependências adicionais do Shark para acessar
KryoSerializationWrapper
, pois o Chill do Twitter já está sendo usado pelo Spark principalfonte
Palestra completa, explicando completamente o problema, que propõe uma ótima maneira de mudar de paradigma para evitar esses problemas de serialização: https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory- leaks-no-ws.md
A resposta mais votada é basicamente sugerir jogar fora um recurso de idioma inteiro - que não está mais usando métodos e apenas usando funções. De fato, em métodos de programação funcional, as classes devem ser evitadas, mas transformá-las em funções não resolve o problema de design aqui (veja o link acima).
Como uma solução rápida nessa situação específica, você pode simplesmente usar a
@transient
anotação para dizer para não tentar serializar o valor incorreto (aqui,Spark.ctx
é uma classe personalizada, não a seguinte nomeação do OP do Spark):Você também pode reestruturar o código para que o rddList mora em outro lugar, mas isso também é desagradável.
O futuro é provavelmente esporos
No futuro, Scala incluirá essas coisas chamadas "esporos" que devem permitir o controle fino dos grãos, o que faz e o que não é exatamente puxado por um fechamento. Além disso, isso deve transformar todos os erros de puxar acidentalmente tipos não serializáveis (ou quaisquer valores indesejados) em erros de compilação, e não agora, o que é horrível exceção em tempo de execução / vazamento de memória.
http://docs.scala-lang.org/sips/pending/spores.html
Uma dica sobre serialização Kryo
Ao usar o kyro, faça com que o registro seja necessário, isso significa que você obtém erros em vez de vazamentos de memória:
"Finalmente, eu sei que o kryo tem o kryo.setRegistrationOptional (true), mas estou com dificuldades para tentar descobrir como usá-lo. Quando essa opção está ativada, o kryo ainda parece lançar exceções se eu não tiver me registrado. Aulas."
Estratégia para registrar aulas com o kryo
Obviamente, isso fornece apenas controle em nível de tipo e não em nível de valor.
... mais idéias por vir.
fonte
Resolvi esse problema usando uma abordagem diferente. Você só precisa serializar os objetos antes de passar pelo fechamento e desserializar depois. Essa abordagem funciona, mesmo que suas aulas não sejam serializáveis, porque ela usa o Kryo nos bastidores. Tudo que você precisa é de um pouco de curry. ;)
Aqui está um exemplo de como eu fiz isso:
Sinta-se livre para tornar Blah o mais complicado que você quiser, classe, objeto complementar, classes aninhadas, referências a várias bibliotecas de terceiros.
KryoSerializationWrapper refere-se a: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
fonte
KryoSerializationWrapper
verá que isso faz o Spark pensar que é realmentejava.io.Serializable
- ele simplesmente serializa o objeto internamente usando o Kryo - mais rápido, mais simples. E eu não acho que lida com uma instância estática - apenas desserializa o valor quando o valor.apply () é chamado.Enfrentei um problema semelhante, e o que entendi da resposta de Grega é
seu método doIT está tentando serializar someFunc (_) , mas como o método não é serializável, ele tenta serializar o teste de classe que, novamente, não é serializável.
Portanto, faça seu código funcionar, você deve definir someFunc dentro do método doIT . Por exemplo:
E se houver várias funções entrando em cena, todas essas funções deverão estar disponíveis para o contexto pai.
fonte
Não tenho certeza absoluta de que isso se aplique ao Scala, mas, em Java, resolvi
NotSerializableException
refatorando meu código para que o fechamento não acessasse umfinal
campo não serializável .fonte
FileWriter
é umfinal
campo da classe externa, você não pode fazê-lo. MasFileWriter
pode ser construído a partir de aString
ou aFile
, os quais sãoSerializable
. Portanto, refatorar seu código para construir um local comFileWriter
base no nome do arquivo da classe externa.Para sua informação, no Spark 2.4, muitos de vocês provavelmente encontrarão esse problema. A serialização do Kryo melhorou, mas em muitos casos você não pode usar spark.kryo.unsafe = true ou o ingênuo serializador do kryo.
Para uma solução rápida, tente alterar o seguinte na configuração do Spark
OU
Modifico as transformações RDD personalizadas que encontro ou escrevo pessoalmente usando variáveis de transmissão explícitas e utilizando a nova API embutida do twitter-chill, convertendo-as de
rdd.map(row =>
parardd.mapPartitions(partition => {
funções.Exemplo
Caminho antigo (não ótimo)
Caminho (melhor) alternativo
Essa nova maneira chamará a variável broadcast uma vez por partição, o que é melhor. Você ainda precisará usar a serialização Java se não registrar classes.
fonte