Tarefa não serializável: java.io.NotSerializableException ao chamar a função fora do fechamento apenas em classes e não em objetos

224

Obtendo um comportamento estranho ao chamar a função fora de um fechamento:

  • quando a função está em um objeto, tudo está funcionando
  • quando a função está em uma classe get:

Tarefa não serializável: java.io.NotSerializableException: testing

O problema é que eu preciso do meu código em uma classe e não em um objeto. Alguma idéia de por que isso está acontecendo? Um objeto Scala é serializado (padrão?)?

Este é um exemplo de código funcional:

object working extends App {
    val list = List(1,2,3)

    val rddList = Spark.ctx.parallelize(list)
    //calling function outside closure 
    val after = rddList.map(someFunc(_))

    def someFunc(a:Int)  = a+1

    after.collect().map(println(_))
}

Este é o exemplo não útil:

object NOTworking extends App {
  new testing().doIT
}

//adding extends Serializable wont help
class testing {  
  val list = List(1,2,3)  
  val rddList = Spark.ctx.parallelize(list)

  def doIT =  {
    //again calling the fucntion someFunc 
    val after = rddList.map(someFunc(_))
    //this will crash (spark lazy)
    after.collect().map(println(_))
  }

  def someFunc(a:Int) = a+1
}
Nimrod007
fonte
O que é o Spark.ctx? Não há nenhum objeto Spark com o método ctx AFAICT
javadba 30/10

Respostas:

334

Os RDDs estendem a interface Serialisable , portanto, não é isso que está causando a falha da sua tarefa. Agora, isso não significa que você pode serializar um RDDcom o Spark e evitarNotSerializableException

O Spark é um mecanismo de computação distribuído e sua abstração principal é um conjunto de dados distribuído resiliente ( RDD ), que pode ser visto como uma coleção distribuída. Basicamente, os elementos do RDD são particionados nos nós do cluster, mas o Spark abstrai isso do usuário, permitindo que ele interaja com o RDD (coleção) como se fosse um local.

Não entrar em muitos detalhes, mas quando você executar transformações diferentes em um RDD ( map, flatMap, filtere outros), o código de transformação (fechamento) é:

  1. serializado no nó do driver,
  2. enviado para os nós apropriados no cluster,
  3. desserializado,
  4. e finalmente executado nos nós

Obviamente, você pode executá-lo localmente (como no seu exemplo), mas todas essas fases (além do envio pela rede) ainda ocorrem. [Isso permite detectar erros antes de implantar na produção]

O que acontece no seu segundo caso é que você está chamando um método, definido na classe testingde dentro da função de mapa. O Spark vê que, e como os métodos não podem ser serializados por conta própria, o Spark tenta serializar toda a testing classe, para que o código ainda funcione quando executado em outra JVM. Você tem duas possibilidades:

Você pode tornar o teste de classe serializável, para que toda a classe possa ser serializada pelo Spark:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test extends java.io.Serializable {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  def someFunc(a: Int) = a + 1
}

ou você cria uma someFuncfunção em vez de um método (funções são objetos no Scala), para que o Spark possa serializá-lo:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  val someFunc = (a: Int) => a + 1
}

Semelhante, mas não o mesmo problema com a serialização de classe, pode ser do seu interesse e você pode ler sobre isso nesta apresentação do Spark Summit 2013 .

Como observação lateral, você pode reescrever rddList.map(someFunc(_))para rddList.map(someFunc), eles são exatamente os mesmos. Geralmente, o segundo é preferido, pois é menos detalhado e limpo de ler.

EDIT (2015-03-15): O SPARK-5307 introduziu o SerializationDebugger e o Spark 1.3.0 é a primeira versão a usá-lo. Ele adiciona o caminho de serialização para um NotSerializableException . Quando uma NotSerializableException é encontrada, o depurador visita o gráfico do objeto para encontrar o caminho para o objeto que não pode ser serializado e constrói informações para ajudar o usuário a encontrar o objeto.

No caso do OP, é isso que é impresso no stdout:

Serialization stack:
    - object not serializable (class: testing, value: testing@2dfe2f00)
    - field (class: testing$$anonfun$1, name: $outer, type: class testing)
    - object (class testing$$anonfun$1, <function1>)
Grega Kešpret
fonte
1
Hmm, o que você explicou certamente faz sentido e explica por que toda a classe é serializada (algo que eu não entendi completamente). No entanto, continuarei afirmando que os rdds não são serializáveis ​​(bem, eles se estendem Serializable, mas isso não significa que eles não causam NotSerializableException, tente). É por isso que se você colocá-los fora das classes, ele corrige o erro. Vou editar minha resposta um pouco para ser mais preciso sobre o que quero dizer - ou seja, eles causam a exceção, não que eles estendam a interface.
samthebest
35
No caso de você não tem controle sobre a classe que você precisa para ser serializado ... se você estiver usando Scala você pode apenas instanciar com Serializable:val test = new Test with Serializable
Mark S
4
"rddList.map (someFunc (_)) para rddList.map (someFunc), eles são exatamente iguais" Não, eles não são exatamente iguais e, de fato, usar o último pode causar exceções de serialização, caso o primeiro não fosse.
samthebest
1
@samthebest você poderia explicar por que o map (someFunc (_)) não causaria exceções de serialização, enquanto o map (someFunc) causaria?
Alon
31

A resposta de Grega é ótima para explicar por que o código original não funciona e duas maneiras de corrigir o problema. No entanto, esta solução não é muito flexível; considere o caso em que seu fechamento inclui uma chamada de método em uma Serializableclasse que você não tem controle. Você não pode adicionar a Serializablemarca a essa classe nem alterar a implementação subjacente para transformar o método em uma função.

Nilesh apresenta uma ótima solução alternativa para isso, mas a solução pode ser feita de forma mais concisa e geral:

def genMapper[A, B](f: A => B): A => B = {
  val locker = com.twitter.chill.MeatLocker(f)
  x => locker.get.apply(x)
}

Esse serializador de funções pode ser usado para quebrar automaticamente fechamentos e chamadas de método:

rdd map genMapper(someFunc)

Essa técnica também tem o benefício de não exigir as dependências adicionais do Shark para acessar KryoSerializationWrapper, pois o Chill do Twitter já está sendo usado pelo Spark principal

Ben Sidhom
fonte
Olá, gostaria de registrar algo se usar seu código? Eu tentei e recebi uma exceção de classe Não foi possível encontrar do kryo. THX
G_cy 11/11
25

Palestra completa, explicando completamente o problema, que propõe uma ótima maneira de mudar de paradigma para evitar esses problemas de serialização: https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory- leaks-no-ws.md

A resposta mais votada é basicamente sugerir jogar fora um recurso de idioma inteiro - que não está mais usando métodos e apenas usando funções. De fato, em métodos de programação funcional, as classes devem ser evitadas, mas transformá-las em funções não resolve o problema de design aqui (veja o link acima).

Como uma solução rápida nessa situação específica, você pode simplesmente usar a @transientanotação para dizer para não tentar serializar o valor incorreto (aqui, Spark.ctxé uma classe personalizada, não a seguinte nomeação do OP do Spark):

@transient
val rddList = Spark.ctx.parallelize(list)

Você também pode reestruturar o código para que o rddList mora em outro lugar, mas isso também é desagradável.

O futuro é provavelmente esporos

No futuro, Scala incluirá essas coisas chamadas "esporos" que devem permitir o controle fino dos grãos, o que faz e o que não é exatamente puxado por um fechamento. Além disso, isso deve transformar todos os erros de puxar acidentalmente tipos não serializáveis ​​(ou quaisquer valores indesejados) em erros de compilação, e não agora, o que é horrível exceção em tempo de execução / vazamento de memória.

http://docs.scala-lang.org/sips/pending/spores.html

Uma dica sobre serialização Kryo

Ao usar o kyro, faça com que o registro seja necessário, isso significa que você obtém erros em vez de vazamentos de memória:

"Finalmente, eu sei que o kryo tem o kryo.setRegistrationOptional (true), mas estou com dificuldades para tentar descobrir como usá-lo. Quando essa opção está ativada, o kryo ainda parece lançar exceções se eu não tiver me registrado. Aulas."

Estratégia para registrar aulas com o kryo

Obviamente, isso fornece apenas controle em nível de tipo e não em nível de valor.

... mais idéias por vir.

samthebest
fonte
9

Resolvi esse problema usando uma abordagem diferente. Você só precisa serializar os objetos antes de passar pelo fechamento e desserializar depois. Essa abordagem funciona, mesmo que suas aulas não sejam serializáveis, porque ela usa o Kryo nos bastidores. Tudo que você precisa é de um pouco de curry. ;)

Aqui está um exemplo de como eu fiz isso:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
               (foo: Foo) : Bar = {
    kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()

object Blah(abc: ABC) extends (Foo => Bar) {
    def apply(foo: Foo) : Bar = { //This is the real function }
}

Sinta-se livre para tornar Blah o mais complicado que você quiser, classe, objeto complementar, classes aninhadas, referências a várias bibliotecas de terceiros.

KryoSerializationWrapper refere-se a: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

Nilesh
fonte
Isso realmente serializa a instância ou cria uma instância estática e serializa uma referência (veja minha resposta).
precisa
2
@samthebest você poderia elaborar? Se você investigar, KryoSerializationWrapperverá que isso faz o Spark pensar que é realmente java.io.Serializable- ele simplesmente serializa o objeto internamente usando o Kryo - mais rápido, mais simples. E eu não acho que lida com uma instância estática - apenas desserializa o valor quando o valor.apply () é chamado.
Nilesh
8

Enfrentei um problema semelhante, e o que entendi da resposta de Grega é

object NOTworking extends App {
 new testing().doIT
}
//adding extends Serializable wont help
class testing {

val list = List(1,2,3)

val rddList = Spark.ctx.parallelize(list)

def doIT =  {
  //again calling the fucntion someFunc 
  val after = rddList.map(someFunc(_))
  //this will crash (spark lazy)
  after.collect().map(println(_))
}

def someFunc(a:Int) = a+1

}

seu método doIT está tentando serializar someFunc (_) , mas como o método não é serializável, ele tenta serializar o teste de classe que, novamente, não é serializável.

Portanto, faça seu código funcionar, você deve definir someFunc dentro do método doIT . Por exemplo:

def doIT =  {
 def someFunc(a:Int) = a+1
  //function definition
 }
 val after = rddList.map(someFunc(_))
 after.collect().map(println(_))
}

E se houver várias funções entrando em cena, todas essas funções deverão estar disponíveis para o contexto pai.

Tarang Bhalodia
fonte
7

Não tenho certeza absoluta de que isso se aplique ao Scala, mas, em Java, resolvi NotSerializableExceptionrefatorando meu código para que o fechamento não acessasse um finalcampo não serializável .

Trebor Rude
fonte
Estou enfrentando o mesmo problema em Java, estou tentando usar a classe FileWriter do pacote Java IO dentro do método RDD foreach. Você pode me informar como podemos resolver isso.
Shankar
1
Bem, Shankar, se FileWriteré um finalcampo da classe externa, você não pode fazê-lo. Mas FileWriterpode ser construído a partir de a Stringou a File, os quais são Serializable. Portanto, refatorar seu código para construir um local com FileWriterbase no nome do arquivo da classe externa.
Trebor rude
0

Para sua informação, no Spark 2.4, muitos de vocês provavelmente encontrarão esse problema. A serialização do Kryo melhorou, mas em muitos casos você não pode usar spark.kryo.unsafe = true ou o ingênuo serializador do kryo.

Para uma solução rápida, tente alterar o seguinte na configuração do Spark

spark.kryo.unsafe="false"

OU

spark.serializer="org.apache.spark.serializer.JavaSerializer"

Modifico as transformações RDD personalizadas que encontro ou escrevo pessoalmente usando variáveis ​​de transmissão explícitas e utilizando a nova API embutida do twitter-chill, convertendo-as de rdd.map(row =>para rdd.mapPartitions(partition => {funções.

Exemplo

Caminho antigo (não ótimo)

val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val outputRDD = rdd.map(row => {
    val value = sampleMap.get(row._1)
    value
})

Caminho (melhor) alternativo

import com.twitter.chill.MeatLocker
val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val brdSerSampleMap = spark.sparkContext.broadcast(MeatLocker(sampleMap))

rdd.mapPartitions(partition => {
    val deSerSampleMap = brdSerSampleMap.value.get
    partition.map(row => {
        val value = sampleMap.get(row._1)
        value
    }).toIterator
})

Essa nova maneira chamará a variável broadcast uma vez por partição, o que é melhor. Você ainda precisará usar a serialização Java se não registrar classes.

Igreja de Gabe
fonte