Como posso converter um RDD ( org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
) em um Dataframe org.apache.spark.sql.DataFrame
. Eu converti um dataframe para rdd usando .rdd
. Depois de processá-lo, quero-o novamente no dataframe. Como posso fazer isso ?
scala
apache-spark
apache-spark-sql
rdd
user568109
fonte
fonte
Respostas:
SqlContext
tem várioscreateDataFrame
métodos que criam umDataFrame
dadoRDD
. Eu imagino que um deles funcione para o seu contexto.Por exemplo:
fonte
Esse código funciona perfeitamente no Spark 2.x com Scala 2.11
Importar classes necessárias
Criar
SparkSession
objeto, e aqui estáspark
Vamos
RDD
fazer issoDataFrame
Método 1
Usando
SparkSession.createDataFrame(RDD obj)
.Método 2
Usando
SparkSession.createDataFrame(RDD obj)
e especificando nomes de colunas.Método 3 (resposta real à pergunta)
Dessa forma, a entrada
rdd
deve ser do tipoRDD[Row]
.crie o esquema
Agora aplique tanto
rowsRdd
eschema
paracreateDataFrame()
fonte
Supondo que seu RDD [linha] seja chamado rdd, você pode usar:
fonte
Nota: Esta resposta foi originalmente publicada aqui
Estou postando esta resposta porque gostaria de compartilhar detalhes adicionais sobre as opções disponíveis que não encontrei nas outras respostas
Para criar um DataFrame a partir de um RDD de linhas, existem duas opções principais:
1) Como já mencionado, você pode usar o
toDF()
que pode ser importado porimport sqlContext.implicits._
. No entanto, essa abordagem funciona apenas para os seguintes tipos de RDDs:RDD[Int]
RDD[Long]
RDD[String]
RDD[T <: scala.Product]
(fonte: Scaladoc do
SQLContext.implicits
objeto)A última assinatura realmente significa que pode funcionar para um RDD de tuplas ou um RDD de classes de caso (porque tuplas e classes de caso são subclasses de
scala.Product
).Portanto, para usar essa abordagem para um
RDD[Row]
, você deve mapeá-lo para umRDD[T <: scala.Product]
. Isso pode ser feito mapeando cada linha para uma classe de caso personalizada ou uma tupla, como nos seguintes trechos de código:ou
A principal desvantagem dessa abordagem (na minha opinião) é que você precisa definir explicitamente o esquema do DataFrame resultante na função de mapa, coluna por coluna. Talvez isso possa ser feito através de programação se você não conhecer o esquema com antecedência, mas as coisas podem ficar um pouco confusas por lá. Então, como alternativa, existe outra opção:
2) Você pode usar
createDataFrame(rowRDD: RDD[Row], schema: StructType)
como na resposta aceita, disponível no objeto SQLContext . Exemplo para converter um RDD de um DataFrame antigo:Observe que não há necessidade de definir explicitamente nenhuma coluna de esquema. Reutilizamos o esquema antigo do DF, que é de
StructType
classe e pode ser facilmente estendido. No entanto, essa abordagem às vezes não é possível e, em alguns casos, pode ser menos eficiente que a primeira.fonte
import sqlContext.implicits.
Suponha que você tenha um
DataFrame
e deseje fazer alguma modificação nos dados dos campos, convertendo-os paraRDD[Row]
.Para converter de volta para
DataFrame
partirRDD
, precisamos definir o tipo de estrutura doRDD
.Se o tipo de dados foi
Long
, ele se tornará comoLongType
na estrutura.Se
String
entãoStringType
na estrutura.Agora você pode converter o RDD em DataFrame usando o método createDataFrame .
fonte
Aqui está um exemplo simples de converter sua lista em Spark RDD e, em seguida, converter esse Spark RDD em Dataframe.
Observe que eu usei o scala REPL do Spark-shell para executar o seguinte código: Aqui sc é uma instância do SparkContext que está implicitamente disponível no Spark-shell. Espero que ele responda sua pergunta.
fonte
Método 1: (Scala)
Método 2: (Scala)
Método 1: (Python)
Método 2: (Python)
Extraiu o valor do objeto de linha e aplicou a classe de caso para converter rdd em DF
fonte
Nas versões mais recentes do spark (2.0+)
fonte
Supondo que val spark seja um produto de um SparkSession.builder ...
Mesmas etapas, mas com menos declarações val:
fonte
Tentei explicar a solução usando o problema de contagem de palavras . 1. Leia o arquivo usando sc
Métodos para criar DF
Ler arquivo usando o spark
Rdd para Dataframe
val df = sc.textFile ("D: // cca175 / data /") .toDF ("t1") df.show
Método 1
Criar RDD de contagem de palavras no Dataframe
Método2
Criar Dataframe a partir do Rdd
Método3
Definir esquema
importar org.apache.spark.sql.types._
esquema de val = novo StructType (). add (StructField ("word", StringType, true)). add (StructField ("count", StringType, true))
Criar RowRDD
Criar DataFrame a partir do RDD com esquema
val df = spark.createDataFrame (rowRdd, esquema)
df.show
fonte
Para converter uma matriz [linha] em DataFrame ou conjunto de dados, o seguinte funciona de maneira elegante:
Digamos, o esquema é o StructType da linha e, em seguida,
fonte