Atualizar
Essa resposta ainda é válido e informativo, embora as coisas estão agora melhor desde 2.2 / 2.3, que adiciona suporte embutido codificador para Set
, Seq
, Map
, Date
, Timestamp
, e BigDecimal
. Se você prefere criar tipos apenas com classes de caso e os tipos usuais de Scala, deve ficar bem com apenas o implícito em SQLImplicits
.
Infelizmente, praticamente nada foi adicionado para ajudar nisso. Buscando @since 2.0.0
no Encoders.scala
ou SQLImplicits.scala
encontra coisas a ver principalmente com tipos primitivos (e alguns ajustes de aulas de caso). Portanto, a primeira coisa a dizer: atualmente, não há um suporte realmente bom para codificadores de classes personalizadas . Com isso fora do caminho, a seguir, alguns truques que fazem um trabalho tão bom quanto podemos esperar, dado o que temos atualmente à nossa disposição. Como um aviso antecipado: isso não funcionará perfeitamente e farei o possível para tornar todas as limitações claras e diretas.
Qual é exatamente o problema
Quando você deseja criar um conjunto de dados, o Spark "requer um codificador (para converter um objeto JVM do tipo T para a representação interna do Spark SQL) que geralmente é criado automaticamente por meio de implícitos de a SparkSession
ou pode ser criado explicitamente chamando métodos estáticos on Encoders
"(retirado dos documentos emcreateDataset
). Um codificador assumirá o formato Encoder[T]
onde T
é o tipo que você está codificando. A primeira sugestão é adicionar import spark.implicits._
(o que fornece esses codificadores implícitos) e a segunda sugestão é transmitir explicitamente o codificador implícito usando este conjunto de funções relacionadas ao codificador.
Não há codificador disponível para classes regulares, portanto
import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
fornecerá o seguinte erro implícito relacionado ao tempo de compilação:
Não foi possível encontrar o codificador para o tipo armazenado em um conjunto de dados. Tipos primitivos (Int, String, etc) e Tipos de produto (classes de caso) são suportados pela importação de sqlContext.implicits._ O suporte para serializar outros tipos será adicionado em versões futuras
No entanto, se você agrupar qualquer tipo que você acabou de usar para obter o erro acima em alguma classe que se estende Product
, o erro é adiado para o tempo de execução, portanto, confuso.
import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
Compila muito bem, mas falha em tempo de execução com
java.lang.UnsupportedOperationException: nenhum codificador encontrado para MyObj
A razão para isso é que os codificadores que o Spark cria com os implícitos são realmente feitos apenas no tempo de execução (via scala relfection). Nesse caso, todas as verificações do Spark em tempo de compilação são de que a classe mais externa se estende Product
(o que todas as classes de caso fazem) e só percebe em tempo de execução que ainda não sabe o que fazer MyObj
(o mesmo problema ocorre se eu tentar fazer a Dataset[(Int,MyObj)]
- O Spark aguarda até que o tempo de execução seja ativado MyObj
. Estes são problemas centrais que precisam urgentemente de ser corrigidos:
- algumas classes que estendem a
Product
compilação apesar de sempre travarem no tempo de execução e
- não há como passar codificadores personalizados para tipos aninhados (não tenho como alimentar um codificador Spark apenas para
MyObj
que ele saiba codificar Wrap[MyObj]
ou (Int,MyObj)
).
Apenas use kryo
A solução que todos sugerem é usar o kryo
codificador.
import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
Isso fica muito tedioso rapidamente. Especialmente se o seu código estiver manipulando todos os tipos de conjuntos de dados, ingressando, agrupando etc. Você acaba acumulando um monte de implícitos extras. Então, por que não deixar implícito que isso é feito automaticamente?
import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) =
org.apache.spark.sql.Encoders.kryo[A](ct)
E agora, parece que posso fazer quase tudo o que eu quero (o exemplo abaixo não funcionará no local spark-shell
onde spark.implicits._
é importado automaticamente)
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i, d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!
Ou quase. O problema é que o uso de kryo
leads no Spark apenas armazena todas as linhas do conjunto de dados como um objeto binário simples. Por map
, filter
, foreach
que é o suficiente, mas para operações como join
, faísca realmente precisa que estes sejam separados em colunas. Inspecionando o esquema para d2
ou d3
, você vê que há apenas uma coluna binária:
d2.printSchema
// root
// |-- value: binary (nullable = true)
Solução parcial para tuplas
Portanto, usando a mágica dos implícitos no Scala (mais na 6.26.3 Sobrecarregando a resolução ), posso criar uma série de implícitos que farão o melhor trabalho possível, pelo menos para as tuplas, e funcionarão bem com os implícitos existentes:
import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._ // we can still take advantage of all the old implicits
implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)
implicit def tuple2[A1, A2](
implicit e1: Encoder[A1],
e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)
implicit def tuple3[A1, A2, A3](
implicit e1: Encoder[A1],
e2: Encoder[A2],
e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)
// ... you can keep making these
Então, armado com esses implícitos, posso fazer meu exemplo acima funcionar, embora com alguma coluna renomeando
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")
Eu ainda não descobri como obter os nomes tupla esperados ( _1
, _2
...) por padrão sem renomeá-los - se alguém quer brincar com isso, este é o lugar onde o nome "value"
fica introduzido e este é o lugar onde a tupla nomes são geralmente adicionados. No entanto, o ponto principal é que agora tenho um bom esquema estruturado:
d4.printSchema
// root
// |-- _1: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
// |-- _2: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
Portanto, em resumo, esta solução alternativa:
- nos permite obter colunas separadas para tuplas (para que possamos entrar novamente em tuplas, yay!)
- novamente podemos confiar nos implícitos (portanto, não é preciso estar passando por
kryo
todo o lugar)
- é quase totalmente compatível com
import spark.implicits._
(com alguma renomeação envolvida)
- se não vamos juntar as
kyro
colunas binários serializados, e muito menos em campos aqueles que podem ter
- tem o efeito colateral desagradável de renomear algumas das colunas da tupla para "valor" (se necessário, isso pode ser desfeito ao converter
.toDF
, especificar novos nomes de coluna e converter novamente em um conjunto de dados - e os nomes dos esquemas parecem preservados por meio de junções , onde eles são mais necessários).
Solução parcial para aulas em geral
Este é menos agradável e não tem boa solução. No entanto, agora que temos a solução de tupla acima, acho que a solução implícita de conversão de outra resposta também será um pouco menos dolorosa, pois você pode converter suas classes mais complexas em tuplas. Depois de criar o conjunto de dados, você provavelmente renomeará as colunas usando a abordagem de quadro de dados. Se tudo correr bem, isso é realmente uma melhoria, já que agora posso realizar junções nos campos das minhas aulas. Se eu tivesse acabado de usar um kryo
serializador binário plano, isso não seria possível.
Aqui está um exemplo que faz um pouco de tudo: Eu tenho uma classe MyObj
que tem campos de tipos Int
, java.util.UUID
e Set[String]
. O primeiro se cuida. O segundo, embora eu pudesse serializar usando, kryo
seria mais útil se armazenado como a String
(já que UUID
s são geralmente algo com o qual quero me unir). O terceiro realmente pertence apenas a uma coluna binária.
class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])
// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])
// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)
Agora, eu posso criar um conjunto de dados com um bom esquema usando este mecanismo:
val d = spark.createDataset(Seq[MyObjEncoded](
new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]
E o esquema me mostra colunas com os nomes corretos e com as duas primeiras coisas em que posso me unir.
d.printSchema
// root
// |-- i: integer (nullable = false)
// |-- u: string (nullable = true)
// |-- s: binary (nullable = true)
ExpressionEncoder
usando a serialização JSON? No meu caso eu não posso fugir com tuplas, e KRYO me dá uma coluna binária ..Usando codificadores genéricos.
Existem dois codificadores genéricos disponíveis no momento
kryo
ejavaSerialization
onde o último é explicitamente descrito como:Assumindo a seguinte aula
você pode usar esses codificadores adicionando codificador implícito:
que podem ser usados juntos da seguinte maneira:
Ele armazena objetos como
binary
coluna e, quando convertido paraDataFrame
você, obtém o seguinte esquema:Também é possível codificar tuplas usando o
kryo
codificador para um campo específico:Observe que não dependemos de codificadores implícitos aqui, mas passamos o codificador explicitamente para que isso provavelmente não funcione com o
toDS
métodoUsando conversões implícitas:
Forneça conversões implícitas entre a representação que pode ser codificada e a classe personalizada, por exemplo:
Perguntas relacionadas:
fonte
Set
) que receboException in thread "main" java.lang.UnsupportedOperationException: No Encoder found for Set[Bar]
.kryo[Set[Bar]]
Da mesma forma que se a classe contém um campo.Bar
Você precisa de codificador para um objeto inteiro Estes são métodos muito bruto..Bar
você precisará de um codificador para um objeto inteiro". minha pergunta era como codificar esse "projeto inteiro"?Você pode usar o UDTRegistration e, em seguida, Classes de casos, Tuplas, etc ... todas funcionam corretamente com seu Tipo definido pelo usuário!
Digamos que você queira usar um Enum personalizado:
Registre-o assim:
Então use!
Digamos que você queira usar um registro polimórfico:
... e use-o assim:
Você pode escrever uma UDT personalizada que codifique tudo em bytes (estou usando a serialização java aqui, mas provavelmente é melhor instrumentar o contexto Kryo do Spark).
Primeiro defina a classe UDT:
Então registre-o:
Então você pode usá-lo!
fonte
Os codificadores funcionam mais ou menos da mesma maneira
Spark2.0
. EKryo
ainda é aserialization
escolha recomendada .Você pode ver o exemplo a seguir com spark-shell
Até agora] não havia
appropriate encoders
no escopo atual, portanto nossas pessoas não foram codificadas comobinary
valores. Mas isso mudará assim que fornecermos algunsimplicit
codificadores usandoKryo
serialização.fonte
No caso da classe Java Bean, isso pode ser útil
Agora você pode simplesmente ler o dataFrame como DataFrame personalizado
Isso criará um codificador de classe personalizado e não um binário.
fonte
Meus exemplos serão em Java, mas não imagino que seja difícil se adaptar ao Scala.
Eu tenho sido muito bem sucedida conversão
RDD<Fruit>
paraDataset<Fruit>
usar spark.createDataset e Encoders.bean contanto queFruit
é um simples Java Bean .Etapa 1: Crie o Java Bean simples.
Gostaria de ter aulas com tipos primitivos e String como campos antes que o pessoal do DataBricks aprimore seus codificadores. Se você tiver uma classe com objeto aninhado, crie outro Java Bean simples com todos os seus campos nivelados, para poder usar as transformações RDD para mapear o tipo complexo para o mais simples.Claro que é um pouco de trabalho extra, mas imagino que ajudará muito no desempenho ao trabalhar com um esquema plano.
Etapa 2: Obtenha seu conjunto de dados no RDD
E pronto! Espuma, enxágüe, repita.
fonte
Para aqueles que podem, na minha situação, coloquei minha resposta aqui também.
Para ser específico,
Eu estava lendo 'Definir dados digitados' no SQLContext. O formato de dados original é o DataFrame.
val sample = spark.sqlContext.sql("select 1 as a, collect_set(1) as b limit 1") sample.show()
+---+---+ | a| b| +---+---+ | 1|[1]| +---+---+
Em seguida, converta-o em RDD usando rdd.map () com o tipo mutable.WrappedArray.
sample .rdd.map(r => (r.getInt(0), r.getAs[mutable.WrappedArray[Int]](1).toSet)) .collect() .foreach(println)
Resultado:
(1,Set(1))
fonte
Além das sugestões já fornecidas, outra opção que descobri recentemente é que você pode declarar sua classe personalizada, incluindo a característica
org.apache.spark.sql.catalyst.DefinedByConstructorParams
.Isso funciona se a classe tiver um construtor que use tipos que o ExpressionEncoder possa entender, isto é, valores primitivos e coleções padrão. Pode ser útil quando você não pode declarar a classe como uma classe de caso, mas não deseja usar o Kryo para codificá-la toda vez que for incluída em um conjunto de dados.
Por exemplo, eu queria declarar uma classe de caso que incluísse um vetor Breeze. O único codificador capaz de lidar com isso normalmente seria o Kryo. Mas se eu declarasse uma subclasse que estendeu o Breeze DenseVector e o DefinedByConstructorParams, o ExpressionEncoder entenderia que ele poderia ser serializado como uma matriz de Doubles.
Aqui está como eu o declarei:
Agora eu posso usar
SerializableDenseVector
em um conjunto de dados (diretamente ou como parte de um produto) usando um simples ExpressionEncoder e nenhum Kryo. Funciona como um Breeze DenseVector, mas serializa como um Array [Duplo].fonte