Como armazenar objetos personalizados no conjunto de dados?

149

De acordo com a introdução de conjuntos de dados Spark :

Como esperamos ansiosamente pelo Spark 2.0, planejamos algumas melhorias interessantes para conjuntos de dados, especificamente: ... Codificadores personalizados - enquanto atualmente geramos automaticamente codificadores para uma ampla variedade de tipos, gostaríamos de abrir uma API para objetos personalizados.

e tenta armazenar o tipo personalizado em um Datasetlead para o seguinte erro, como:

Não foi possível encontrar o codificador para o tipo armazenado em um conjunto de dados. Tipos primitivos (Int, String, etc) e Tipos de produto (classes de caso) são suportados pela importação de sqlContext.implicits._ O suporte para serializar outros tipos será adicionado em versões futuras

ou:

Java.lang.UnsupportedOperationException: nenhum codificador encontrado para ....

Existem soluções alternativas existentes?


Observe que esta pergunta existe apenas como um ponto de entrada para uma resposta do Wiki da Comunidade. Sinta-se livre para atualizar / melhorar a pergunta e a resposta.

zero323
fonte

Respostas:

240

Atualizar

Essa resposta ainda é válido e informativo, embora as coisas estão agora melhor desde 2.2 / 2.3, que adiciona suporte embutido codificador para Set, Seq, Map, Date, Timestamp, e BigDecimal. Se você prefere criar tipos apenas com classes de caso e os tipos usuais de Scala, deve ficar bem com apenas o implícito em SQLImplicits.


Infelizmente, praticamente nada foi adicionado para ajudar nisso. Buscando @since 2.0.0no Encoders.scalaou SQLImplicits.scalaencontra coisas a ver principalmente com tipos primitivos (e alguns ajustes de aulas de caso). Portanto, a primeira coisa a dizer: atualmente, não há um suporte realmente bom para codificadores de classes personalizadas . Com isso fora do caminho, a seguir, alguns truques que fazem um trabalho tão bom quanto podemos esperar, dado o que temos atualmente à nossa disposição. Como um aviso antecipado: isso não funcionará perfeitamente e farei o possível para tornar todas as limitações claras e diretas.

Qual é exatamente o problema

Quando você deseja criar um conjunto de dados, o Spark "requer um codificador (para converter um objeto JVM do tipo T para a representação interna do Spark SQL) que geralmente é criado automaticamente por meio de implícitos de a SparkSessionou pode ser criado explicitamente chamando métodos estáticos on Encoders"(retirado dos documentos emcreateDataset ). Um codificador assumirá o formato Encoder[T]onde Té o tipo que você está codificando. A primeira sugestão é adicionar import spark.implicits._(o que fornece esses codificadores implícitos) e a segunda sugestão é transmitir explicitamente o codificador implícito usando este conjunto de funções relacionadas ao codificador.

Não há codificador disponível para classes regulares, portanto

import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

fornecerá o seguinte erro implícito relacionado ao tempo de compilação:

Não foi possível encontrar o codificador para o tipo armazenado em um conjunto de dados. Tipos primitivos (Int, String, etc) e Tipos de produto (classes de caso) são suportados pela importação de sqlContext.implicits._ O suporte para serializar outros tipos será adicionado em versões futuras

No entanto, se você agrupar qualquer tipo que você acabou de usar para obter o erro acima em alguma classe que se estende Product, o erro é adiado para o tempo de execução, portanto, confuso.

import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))

Compila muito bem, mas falha em tempo de execução com

java.lang.UnsupportedOperationException: nenhum codificador encontrado para MyObj

A razão para isso é que os codificadores que o Spark cria com os implícitos são realmente feitos apenas no tempo de execução (via scala relfection). Nesse caso, todas as verificações do Spark em tempo de compilação são de que a classe mais externa se estende Product(o que todas as classes de caso fazem) e só percebe em tempo de execução que ainda não sabe o que fazer MyObj(o mesmo problema ocorre se eu tentar fazer a Dataset[(Int,MyObj)]- O Spark aguarda até que o tempo de execução seja ativado MyObj. Estes são problemas centrais que precisam urgentemente de ser corrigidos:

  • algumas classes que estendem a Productcompilação apesar de sempre travarem no tempo de execução e
  • não há como passar codificadores personalizados para tipos aninhados (não tenho como alimentar um codificador Spark apenas para MyObjque ele saiba codificar Wrap[MyObj]ou (Int,MyObj)).

Apenas use kryo

A solução que todos sugerem é usar o kryocodificador.

import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

Isso fica muito tedioso rapidamente. Especialmente se o seu código estiver manipulando todos os tipos de conjuntos de dados, ingressando, agrupando etc. Você acaba acumulando um monte de implícitos extras. Então, por que não deixar implícito que isso é feito automaticamente?

import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = 
  org.apache.spark.sql.Encoders.kryo[A](ct)

E agora, parece que posso fazer quase tudo o que eu quero (o exemplo abaixo não funcionará no local spark-shellonde spark.implicits._é importado automaticamente)

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i,  d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!

Ou quase. O problema é que o uso de kryoleads no Spark apenas armazena todas as linhas do conjunto de dados como um objeto binário simples. Por map, filter, foreachque é o suficiente, mas para operações como join, faísca realmente precisa que estes sejam separados em colunas. Inspecionando o esquema para d2ou d3, você vê que há apenas uma coluna binária:

d2.printSchema
// root
//  |-- value: binary (nullable = true)

Solução parcial para tuplas

Portanto, usando a mágica dos implícitos no Scala (mais na 6.26.3 Sobrecarregando a resolução ), posso criar uma série de implícitos que farão o melhor trabalho possível, pelo menos para as tuplas, e funcionarão bem com os implícitos existentes:

import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._  // we can still take advantage of all the old implicits

implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)

implicit def tuple2[A1, A2](
  implicit e1: Encoder[A1],
           e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)

implicit def tuple3[A1, A2, A3](
  implicit e1: Encoder[A1],
           e2: Encoder[A2],
           e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)

// ... you can keep making these

Então, armado com esses implícitos, posso fazer meu exemplo acima funcionar, embora com alguma coluna renomeando

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i  ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")

Eu ainda não descobri como obter os nomes tupla esperados ( _1, _2...) por padrão sem renomeá-los - se alguém quer brincar com isso, este é o lugar onde o nome "value"fica introduzido e este é o lugar onde a tupla nomes são geralmente adicionados. No entanto, o ponto principal é que agora tenho um bom esquema estruturado:

d4.printSchema
// root
//  |-- _1: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)
//  |-- _2: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)

Portanto, em resumo, esta solução alternativa:

  • nos permite obter colunas separadas para tuplas (para que possamos entrar novamente em tuplas, yay!)
  • novamente podemos confiar nos implícitos (portanto, não é preciso estar passando por kryotodo o lugar)
  • é quase totalmente compatível com import spark.implicits._(com alguma renomeação envolvida)
  • se não vamos juntar as kyrocolunas binários serializados, e muito menos em campos aqueles que podem ter
  • tem o efeito colateral desagradável de renomear algumas das colunas da tupla para "valor" (se necessário, isso pode ser desfeito ao converter .toDF, especificar novos nomes de coluna e converter novamente em um conjunto de dados - e os nomes dos esquemas parecem preservados por meio de junções , onde eles são mais necessários).

Solução parcial para aulas em geral

Este é menos agradável e não tem boa solução. No entanto, agora que temos a solução de tupla acima, acho que a solução implícita de conversão de outra resposta também será um pouco menos dolorosa, pois você pode converter suas classes mais complexas em tuplas. Depois de criar o conjunto de dados, você provavelmente renomeará as colunas usando a abordagem de quadro de dados. Se tudo correr bem, isso é realmente uma melhoria, já que agora posso realizar junções nos campos das minhas aulas. Se eu tivesse acabado de usar um kryoserializador binário plano, isso não seria possível.

Aqui está um exemplo que faz um pouco de tudo: Eu tenho uma classe MyObjque tem campos de tipos Int, java.util.UUIDe Set[String]. O primeiro se cuida. O segundo, embora eu pudesse serializar usando, kryoseria mais útil se armazenado como a String(já que UUIDs são geralmente algo com o qual quero me unir). O terceiro realmente pertence apenas a uma coluna binária.

class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])

// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])

// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
  new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)

Agora, eu posso criar um conjunto de dados com um bom esquema usando este mecanismo:

val d = spark.createDataset(Seq[MyObjEncoded](
  new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
  new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]

E o esquema me mostra colunas com os nomes corretos e com as duas primeiras coisas em que posso me unir.

d.printSchema
// root
//  |-- i: integer (nullable = false)
//  |-- u: string (nullable = true)
//  |-- s: binary (nullable = true)
Alec
fonte
É possível criar uma classe personalizada ExpressionEncoderusando a serialização JSON? No meu caso eu não posso fugir com tuplas, e KRYO me dá uma coluna binária ..
Alexey Svyatkovskiy
1
@ AlexeyS Acho que não. Mas por que você quer isso? Por que você não consegue se safar da última solução que proponho? Se você pode colocar seus dados em JSON, você deve ser capaz de extrair os campos e colocá-los em uma classe caso ...
Alec
1
Infelizmente, a linha inferior desta resposta é que não há solução que funcione.
Baol
@baol Mais ou menos. Mas lembre-se de quão difícil é o que o Spark está fazendo. O sistema de tipos da Scala simplesmente não é poderoso o suficiente para "derivar" codificadores que passam recursivamente pelos campos. Francamente, estou surpreso que ninguém tenha feito uma macro de anotação para isso. Parece a solução natural (mas difícil).
Alec
1
@combinatorist Meu entendimento é que conjuntos de dados e quadros de dados (mas não os RDDs, pois não precisam de codificadores!) são equivalentes do ponto de vista de desempenho. Não subestime a segurança de tipo dos conjuntos de dados! Só porque o Spark usa internamente uma tonelada de reflexão, projeções, etc., não significa que você não deve se preocupar com a segurança de tipo da interface exposta. Mas isso me faz sentir melhor em criar minhas próprias funções de segurança de tipo baseadas em conjunto de dados que usam Dataframes sob o capô.
Alec
32
  1. Usando codificadores genéricos.

    Existem dois codificadores genéricos disponíveis no momento kryoe javaSerializationonde o último é explicitamente descrito como:

    extremamente ineficiente e só deve ser usado como último recurso.

    Assumindo a seguinte aula

    class Bar(i: Int) {
      override def toString = s"bar $i"
      def bar = i
    }

    você pode usar esses codificadores adicionando codificador implícito:

    object BarEncoders {
      implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = 
      org.apache.spark.sql.Encoders.kryo[Bar]
    }

    que podem ser usados ​​juntos da seguinte maneira:

    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarEncoders._
    
        val ds = Seq(new Bar(1)).toDS
        ds.show
    
        sc.stop()
      }
    }

    Ele armazena objetos como binarycoluna e, quando convertido para DataFramevocê, obtém o seguinte esquema:

    root
     |-- value: binary (nullable = true)

    Também é possível codificar tuplas usando o kryocodificador para um campo específico:

    val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar])
    
    spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder)
    // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary]

    Observe que não dependemos de codificadores implícitos aqui, mas passamos o codificador explicitamente para que isso provavelmente não funcione com o toDSmétodo

  2. Usando conversões implícitas:

    Forneça conversões implícitas entre a representação que pode ser codificada e a classe personalizada, por exemplo:

    object BarConversions {
      implicit def toInt(bar: Bar): Int = bar.bar
      implicit def toBar(i: Int): Bar = new Bar(i)
    }
    
    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarConversions._
    
        type EncodedBar = Int
    
        val bars: RDD[EncodedBar]  = sc.parallelize(Seq(new Bar(1)))
        val barsDS = bars.toDS
    
        barsDS.show
        barsDS.map(_.bar).show
    
        sc.stop()
      }
    }

Perguntas relacionadas:

zero323
fonte
A solução 1 parece não funcionar para coleções digitadas (pelo menos Set) que recebo Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for Set[Bar].
Victor P.
@VictorP. Espera-se que eu tenho medo Em caso como este você vai precisar de um codificador para o tipo específico ( kryo[Set[Bar]]Da mesma forma que se a classe contém um campo. BarVocê precisa de codificador para um objeto inteiro Estes são métodos muito bruto..
zero323
@ zero323 Estou enfrentando o mesmo problema. Você pode colocar um exemplo de código de como codificar todo o projeto? Muito Obrigado!
Rocha
@Rock eu não tenho certeza que você entende por "projeto inteiro"
zero323
@ zero323 pelo seu comentário, "se a classe contiver um campo, Barvocê precisará de um codificador para um objeto inteiro". minha pergunta era como codificar esse "projeto inteiro"?
Rocha
9

Você pode usar o UDTRegistration e, em seguida, Classes de casos, Tuplas, etc ... todas funcionam corretamente com seu Tipo definido pelo usuário!

Digamos que você queira usar um Enum personalizado:

trait CustomEnum { def value:String }
case object Foo extends CustomEnum  { val value = "F" }
case object Bar extends CustomEnum  { val value = "B" }
object CustomEnum {
  def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get
}

Registre-o assim:

// First define a UDT class for it:
class CustomEnumUDT extends UserDefinedType[CustomEnum] {
  override def sqlType: DataType = org.apache.spark.sql.types.StringType
  override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value)
  // Note that this will be a UTF8String type
  override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString)
  override def userClass: Class[CustomEnum] = classOf[CustomEnum]
}

// Then Register the UDT Class!
// NOTE: you have to put this file into the org.apache.spark package!
UDTRegistration.register(classOf[CustomEnum].getName, classOf[CustomEnumUDT].getName)

Então use!

case class UsingCustomEnum(id:Int, en:CustomEnum)

val seq = Seq(
  UsingCustomEnum(1, Foo),
  UsingCustomEnum(2, Bar),
  UsingCustomEnum(3, Foo)
).toDS()
seq.filter(_.en == Foo).show()
println(seq.collect())

Digamos que você queira usar um registro polimórfico:

trait CustomPoly
case class FooPoly(id:Int) extends CustomPoly
case class BarPoly(value:String, secondValue:Long) extends CustomPoly

... e use-o assim:

case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()

Você pode escrever uma UDT personalizada que codifique tudo em bytes (estou usando a serialização java aqui, mas provavelmente é melhor instrumentar o contexto Kryo do Spark).

Primeiro defina a classe UDT:

class CustomPolyUDT extends UserDefinedType[CustomPoly] {
  val kryo = new Kryo()

  override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
  override def serialize(obj: CustomPoly): Any = {
    val bos = new ByteArrayOutputStream()
    val oos = new ObjectOutputStream(bos)
    oos.writeObject(obj)

    bos.toByteArray
  }
  override def deserialize(datum: Any): CustomPoly = {
    val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
    val ois = new ObjectInputStream(bis)
    val obj = ois.readObject()
    obj.asInstanceOf[CustomPoly]
  }

  override def userClass: Class[CustomPoly] = classOf[CustomPoly]
}

Então registre-o:

// NOTE: The file you do this in has to be inside of the org.apache.spark package!
UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName)

Então você pode usá-lo!

// As shown above:
case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()
ChoppyTheLumberjack
fonte
1
Não vejo onde o seu kryo é usado (no CustomPolyUDT) #
mathieu
Estou tentando definir uma UDT no meu projeto e estou recebendo este erro "O símbolo UserDefinedType está inacessível neste local". Qualquer ajuda ?
Rijo Joseph
Oi @RijoJoseph. Você precisa criar um pacote org.apache.spark em seu projeto e colocar seu código UDT nele.
ChoppyTheLumberjack
6

Os codificadores funcionam mais ou menos da mesma maneira Spark2.0. E Kryoainda é a serializationescolha recomendada .

Você pode ver o exemplo a seguir com spark-shell

scala> import spark.implicits._
import spark.implicits._

scala> import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoders

scala> case class NormalPerson(name: String, age: Int) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class NormalPerson

scala> case class ReversePerson(name: Int, age: String) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class ReversePerson

scala> val normalPersons = Seq(
 |   NormalPerson("Superman", 25),
 |   NormalPerson("Spiderman", 17),
 |   NormalPerson("Ironman", 29)
 | )
normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29))

scala> val ds1 = sc.parallelize(normalPersons).toDS
ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int]

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds1.show()
+---------+---+
|     name|age|
+---------+---+
| Superman| 25|
|Spiderman| 17|
|  Ironman| 29|
+---------+---+

scala> ds2.show()
+----+---------+
|name|      age|
+----+---------+
|  25| Superman|
|  17|Spiderman|
|  29|  Ironman|
+----+---------+

scala> ds1.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Superman. I am 25 years old.
I am Spiderman. I am 17 years old.

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds2.foreach(p => println(p.aboutMe))
I am 17. I am Spiderman years old.
I am 25. I am Superman years old.
I am 29. I am Ironman years old.

Até agora] não havia appropriate encodersno escopo atual, portanto nossas pessoas não foram codificadas como binaryvalores. Mas isso mudará assim que fornecermos alguns implicitcodificadores usando Kryoserialização.

// Provide Encoders

scala> implicit val normalPersonKryoEncoder = Encoders.kryo[NormalPerson]
normalPersonKryoEncoder: org.apache.spark.sql.Encoder[NormalPerson] = class[value[0]: binary]

scala> implicit val reversePersonKryoEncoder = Encoders.kryo[ReversePerson]
reversePersonKryoEncoder: org.apache.spark.sql.Encoder[ReversePerson] = class[value[0]: binary]

// Ecoders will be used since they are now present in Scope

scala> val ds3 = sc.parallelize(normalPersons).toDS
ds3: org.apache.spark.sql.Dataset[NormalPerson] = [value: binary]

scala> val ds4 = ds3.map(np => ReversePerson(np.age, np.name))
ds4: org.apache.spark.sql.Dataset[ReversePerson] = [value: binary]

// now all our persons show up as binary values
scala> ds3.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

scala> ds4.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

// Our instances still work as expected    

scala> ds3.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Spiderman. I am 17 years old.
I am Superman. I am 25 years old.

scala> ds4.foreach(p => println(p.aboutMe))
I am 25. I am Superman years old.
I am 29. I am Ironman years old.
I am 17. I am Spiderman years old.
sarveshseri
fonte
3

No caso da classe Java Bean, isso pode ser útil

import spark.sqlContext.implicits._
import org.apache.spark.sql.Encoders
implicit val encoder = Encoders.bean[MyClasss](classOf[MyClass])

Agora você pode simplesmente ler o dataFrame como DataFrame personalizado

dataFrame.as[MyClass]

Isso criará um codificador de classe personalizado e não um binário.

Akash Mahajan
fonte
1

Meus exemplos serão em Java, mas não imagino que seja difícil se adaptar ao Scala.

Eu tenho sido muito bem sucedida conversão RDD<Fruit>para Dataset<Fruit>usar spark.createDataset e Encoders.bean contanto que Fruité um simples Java Bean .

Etapa 1: Crie o Java Bean simples.

public class Fruit implements Serializable {
    private String name  = "default-fruit";
    private String color = "default-color";

    // AllArgsConstructor
    public Fruit(String name, String color) {
        this.name  = name;
        this.color = color;
    }

    // NoArgsConstructor
    public Fruit() {
        this("default-fruit", "default-color");
    }

    // ...create getters and setters for above fields
    // you figure it out
}

Gostaria de ter aulas com tipos primitivos e String como campos antes que o pessoal do DataBricks aprimore seus codificadores. Se você tiver uma classe com objeto aninhado, crie outro Java Bean simples com todos os seus campos nivelados, para poder usar as transformações RDD para mapear o tipo complexo para o mais simples.Claro que é um pouco de trabalho extra, mas imagino que ajudará muito no desempenho ao trabalhar com um esquema plano.

Etapa 2: Obtenha seu conjunto de dados no RDD

SparkSession spark = SparkSession.builder().getOrCreate();
JavaSparkContext jsc = new JavaSparkContext();

List<Fruit> fruitList = ImmutableList.of(
    new Fruit("apple", "red"),
    new Fruit("orange", "orange"),
    new Fruit("grape", "purple"));
JavaRDD<Fruit> fruitJavaRDD = jsc.parallelize(fruitList);


RDD<Fruit> fruitRDD = fruitJavaRDD.rdd();
Encoder<Fruit> fruitBean = Encoders.bean(Fruit.class);
Dataset<Fruit> fruitDataset = spark.createDataset(rdd, bean);

E pronto! Espuma, enxágüe, repita.

Jimmy Da
fonte
Eu sugiro ressaltar que, para estruturas simples, você seria melhor atendido armazenando-as nos tipos nativos do Spark, em vez de serializá-las em um blob. Eles funcionam melhor no gateway Python, mais transparente no Parquet e podem até ser convertidos em estruturas da mesma forma.
metasim 22/08/18
1

Para aqueles que podem, na minha situação, coloquei minha resposta aqui também.

Para ser específico,

  1. Eu estava lendo 'Definir dados digitados' no SQLContext. O formato de dados original é o DataFrame.

    val sample = spark.sqlContext.sql("select 1 as a, collect_set(1) as b limit 1") sample.show()

    +---+---+ | a| b| +---+---+ | 1|[1]| +---+---+

  2. Em seguida, converta-o em RDD usando rdd.map () com o tipo mutable.WrappedArray.

    sample .rdd.map(r => (r.getInt(0), r.getAs[mutable.WrappedArray[Int]](1).toSet)) .collect() .foreach(println)

    Resultado:

    (1,Set(1))

Taeheon Kwon
fonte
0

Além das sugestões já fornecidas, outra opção que descobri recentemente é que você pode declarar sua classe personalizada, incluindo a característica org.apache.spark.sql.catalyst.DefinedByConstructorParams.

Isso funciona se a classe tiver um construtor que use tipos que o ExpressionEncoder possa entender, isto é, valores primitivos e coleções padrão. Pode ser útil quando você não pode declarar a classe como uma classe de caso, mas não deseja usar o Kryo para codificá-la toda vez que for incluída em um conjunto de dados.

Por exemplo, eu queria declarar uma classe de caso que incluísse um vetor Breeze. O único codificador capaz de lidar com isso normalmente seria o Kryo. Mas se eu declarasse uma subclasse que estendeu o Breeze DenseVector e o DefinedByConstructorParams, o ExpressionEncoder entenderia que ele poderia ser serializado como uma matriz de Doubles.

Aqui está como eu o declarei:

class SerializableDenseVector(values: Array[Double]) extends breeze.linalg.DenseVector[Double](values) with DefinedByConstructorParams
implicit def BreezeVectorToSerializable(bv: breeze.linalg.DenseVector[Double]): SerializableDenseVector = bv.asInstanceOf[SerializableDenseVector]

Agora eu posso usar SerializableDenseVectorem um conjunto de dados (diretamente ou como parte de um produto) usando um simples ExpressionEncoder e nenhum Kryo. Funciona como um Breeze DenseVector, mas serializa como um Array [Duplo].

Matt
fonte