Como posso alterar os tipos de coluna no DataFrame do Spark SQL?

152

Suponha que eu esteja fazendo algo como:

val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true"))
df.printSchema()

root
 |-- year: string (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- blank: string (nullable = true)

df.show()
year make  model comment              blank
2012 Tesla S     No comment
1997 Ford  E350  Go get one now th...

Mas eu realmente queria o yearas Int(e talvez transformar algumas outras colunas).

O melhor que eu pude fazer foi

df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank)
org.apache.spark.sql.DataFrame = [year: int, make: string, model: string, comment: string, blank: string]

o que é um pouco complicado.

Eu sou do R e estou acostumado a escrever, por exemplo

df2 <- df %>%
   mutate(year = year %>% as.integer,
          make = make %>% toupper)

Provavelmente estou perdendo alguma coisa, já que deve haver uma maneira melhor de fazer isso no Spark / Scala ...

kevinykuo
fonte
Eu gosto desta maneira spark.sql ("SELECT STRING (NULLIF (coluna, '')) como column_string")
Eric Bellet

Respostas:

141

Editar: versão mais recente

Desde o spark 2.x você pode usar .withColumn. Confira os documentos aqui:

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset@withColumn(colName:String,col:org.apache.spark.sql.Column) : org.apache.spark.sql.DataFrame

Resposta mais antiga

Desde o Spark versão 1.4, você pode aplicar o método de conversão com DataType na coluna:

import org.apache.spark.sql.types.IntegerType
val df2 = df.withColumn("yearTmp", df.year.cast(IntegerType))
    .drop("year")
    .withColumnRenamed("yearTmp", "year")

Se você estiver usando expressões sql, também poderá fazer:

val df2 = df.selectExpr("cast(year as int) year", 
                        "make", 
                        "model", 
                        "comment", 
                        "blank")

Para obter mais informações, consulte os documentos: http://spark.apache.org/docs/1.6.0/api/scala/#org.apache.spark.sql.DataFrame

msemelman
fonte
4
por que você usou withColumn seguido de drop? Não é mais fácil usar withColumn com o nome da coluna original?
Ameba Spugnosa
@AmebaSpugnosa Acho que, quando o usei, o Spark travou se tivesse nomes de colunas repetidos. Não quando você os cria, mas quando os usa.
precisa saber é o seguinte
5
não há necessidade de descartar a coluna seguida por uma renomeação. Você pode fazer em uma linha #df.withColumn("ctr", temp("ctr").cast(DecimalType(decimalPrecision, decimalScale)))
ruhong
1
Uma cópia nova do quadro de dados é criada apenas para reformular uma coluna nesse caso? Estou esquecendo de algo? Ou talvez haja alguma otimização nos bastidores?
user1814008
5
Indo pelas docs de Spark 2.x, df.withColumn(..)pode adicionar ou substituir uma coluna dependendo do colNameargumento
y2k-shubham
89

[EDIT: março de 2016: obrigado pelos votos! Embora, na verdade, esta não é a melhor resposta, eu acho que as soluções baseadas em withColumn, withColumnRenamede castapresentada pelo msemelman, Martin Senne e outros são mais simples e mais limpo].

Acho que sua abordagem está correta, lembre-se de que um Spark DataFrameé um RDD (imutável) de linhas, portanto nunca substituímos uma coluna, apenas criando novas a DataFramecada vez com um novo esquema.

Supondo que você tenha um df original com o seguinte esquema:

scala> df.printSchema
root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- CRSDepTime: string (nullable = true)

E algumas UDFs definidas em uma ou várias colunas:

import org.apache.spark.sql.functions._

val toInt    = udf[Int, String]( _.toInt)
val toDouble = udf[Double, String]( _.toDouble)
val toHour   = udf((t: String) => "%04d".format(t.toInt).take(2).toInt ) 
val days_since_nearest_holidays = udf( 
  (year:String, month:String, dayOfMonth:String) => year.toInt + 27 + month.toInt-12
 )

A alteração dos tipos de colunas ou a criação de um novo DataFrame a partir de outro pode ser escrito assim:

val featureDf = df
.withColumn("departureDelay", toDouble(df("DepDelay")))
.withColumn("departureHour",  toHour(df("CRSDepTime")))
.withColumn("dayOfWeek",      toInt(df("DayOfWeek")))              
.withColumn("dayOfMonth",     toInt(df("DayofMonth")))              
.withColumn("month",          toInt(df("Month")))              
.withColumn("distance",       toDouble(df("Distance")))              
.withColumn("nearestHoliday", days_since_nearest_holidays(
              df("Year"), df("Month"), df("DayofMonth"))
            )              
.select("departureDelay", "departureHour", "dayOfWeek", "dayOfMonth", 
        "month", "distance", "nearestHoliday")            

que produz:

scala> df.printSchema
root
 |-- departureDelay: double (nullable = true)
 |-- departureHour: integer (nullable = true)
 |-- dayOfWeek: integer (nullable = true)
 |-- dayOfMonth: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- distance: double (nullable = true)
 |-- nearestHoliday: integer (nullable = true)

Isso é bem próximo da sua própria solução. Simplesmente, manter as alterações de tipo e outras transformações como udf vals separadas torna o código mais legível e reutilizável.

Svend
fonte
26
Isso não é seguro nem eficiente. Não é seguro porque uma NULLentrada única ou malformada trava um trabalho inteiro. Não é eficiente porque os UDFs não são transparentes para o Catalyst. Usar UDFs para operações complexas é bom, mas não há razão para usá-los para a conversão de tipos básicos. É por isso que temos castmétodo (veja uma resposta de Martin Senne ). Tornar as coisas transparentes para o Catalyst exige mais trabalho, mas a segurança básica é apenas uma questão de colocar Trye Optiontrabalhar.
Zero323 01/03
Não vi nada relacionado à conversão de string para data, por exemplo "05-ABR-2015"
dbspace 29/04
3
Existe uma maneira de reduzir sua withColumn()seção a uma seção genérica que itera por todas as colunas?
Boern 17/05
Graças a zero323, ao ler isso, descobri por que a solução udf aqui falha. Alguns comentários são melhores do que algumas respostas sobre SO :)
Simon Dirmeier
Existe alguma maneira de conhecer a linha corrompida, significa registros que estão tendo colunas de tipos de dados incorretos durante a transmissão. Como função de
conversão
65

Como a castoperação está disponível para o Spark Column(e como eu pessoalmente não udfsou a favor , como proposto por @ Svendneste momento), que tal:

df.select( df("year").cast(IntegerType).as("year"), ... )

transmitir para o tipo solicitado? Como um efeito colateral puro, os valores não convertíveis / "conversíveis" nesse sentido se tornarão null.

Caso você precise disso como um método auxiliar , use:

object DFHelper{
  def castColumnTo( df: DataFrame, cn: String, tpe: DataType ) : DataFrame = {
    df.withColumn( cn, df(cn).cast(tpe) )
  }
}

que é usado como:

import DFHelper._
val df2 = castColumnTo( df, "year", IntegerType )
Martin Senne
fonte
2
Você pode me aconselhar sobre como proceder, se eu precisar converter e renomear um monte de colunas (eu tenho 50 colunas e relativamente novo para o scala, não tenho certeza de qual é a melhor maneira de abordá-lo sem criar uma duplicação maciça)? Algumas colunas devem permanecer como String, outras devem ser convertidas para Float.
Dmitry Smirnov
como converter uma String em uma data, por exemplo "25-APR-2016" na coluna e "20160302"
dbspace 29/04
@DmitrySmirnov Você já recebeu uma resposta? Eu tenho a mesma pergunta. ;)
Evan Zamir 22/03
@EvanZamir infelizmente não, acabei fazendo um shitton de operações para poder usar dados como rdd em outras etapas. Pergunto-me se isso se tornou mais fácil nos dias de hoje :)
Dmitry Smirnov
60

Primeiro , se você quer transmitir o tipo, então isto:

import org.apache.spark.sql
df.withColumn("year", $"year".cast(sql.types.IntegerType))

Com o mesmo nome da coluna, a coluna será substituída por uma nova. Você não precisa adicionar e excluir etapas.

Em segundo lugar , sobre Scala vs R .
Este é o código que mais se assemelha ao RI:

val df2 = df.select(
   df.columns.map {
     case year @ "year" => df(year).cast(IntegerType).as(year)
     case make @ "make" => functions.upper(df(make)).as(make)
     case other         => df(other)
   }: _*
)

Embora o comprimento do código seja um pouco mais longo que o do R. Isso não tem nada a ver com a verbosidade da linguagem. Em R, mutateé uma função especial para o dataframe R, enquanto em Scala você pode facilmente ad-hoc, graças ao seu poder expressivo.
Em resumo, evita soluções específicas, porque o design do idioma é bom o suficiente para você criar rápida e facilmente seu próprio idioma de domínio.


nota lateral: df.columnsé surpreendentemente um em Array[String]vez de Array[Column], talvez eles queiram que pareça com o quadro de dados do Python pandas.

WeiChing 林 煒 清
fonte
1
Você poderia dar o equivalente ao pyspark?
Harit Vishwakarma
Estou recebendo "início ilegal de definição" .comColumn ("age", $ "age" .cast (sql.types.DoubleType)) para o meu campo "age". Alguma sugestão?
Bluedolphin
Você precisa .cache () do quadro de dados se estiver fazendo essas conversões em várias colunas por motivos de desempenho ou não for necessário, pois o Spark as otimiza?
Skjagini 06/03/19
A importação pode ser import org.apache.spark.sql.types._e, em vez de sql.types.IntegerTypeapenas IntegerType.
nessa.gp
17

Você pode usar selectExprpara torná-lo um pouco mais limpo:

df.selectExpr("cast(year as int) as year", "upper(make) as make",
    "model", "comment", "blank")
dnlbrky
fonte
14

Código Java para modificar o tipo de dados do DataFrame de String para Inteiro

df.withColumn("col_name", df.col("col_name").cast(DataTypes.IntegerType))

Simplesmente converterá o existente (tipo de dados String) para Inteiro.

manishbelsare
fonte
1
Não há DataTypesdentro sql.types! ele é DataType. Além disso, pode-se simplesmente importar IntegerTypee lançar.
Ehsan M. Kermani
@ EhsanM.Kermani, na verdade, DatyaTypes.IntegerType é uma referência legítima.
Cupitor
1
@Cupitor DataTypes.IntegerTypeutilizado para estar em modo DeveloperAPI e é estável em v.2.1.0
Ehsan M. Kermani
Essa é a melhor solução!
Simon Dirmeier 14/10
8

Para converter o ano de string para int, você pode adicionar a seguinte opção ao leitor csv: "inferSchema" -> "true", consulte a documentação do DataBricks

Peter Rose
fonte
5
Isso funciona bem, mas o problema é que o leitor deve fazer uma segunda passagem de seu arquivo
beefyhalo
@beefyhalo absolutamente local, existe alguma maneira de contornar isso?
Ayush
6

Portanto, isso realmente funciona se você tiver problemas para salvar em um driver jdbc como o sqlserver, mas é realmente útil para erros nos quais você encontrará erros de sintaxe e tipos.

import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect}
import org.apache.spark.sql.jdbc.JdbcType
val SQLServerDialect = new JdbcDialect {
  override def canHandle(url: String): Boolean = url.startsWith("jdbc:jtds:sqlserver") || url.contains("sqlserver")

  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
    case StringType => Some(JdbcType("VARCHAR(5000)", java.sql.Types.VARCHAR))
    case BooleanType => Some(JdbcType("BIT(1)", java.sql.Types.BIT))
    case IntegerType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case LongType => Some(JdbcType("BIGINT", java.sql.Types.BIGINT))
    case DoubleType => Some(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE))
    case FloatType => Some(JdbcType("REAL", java.sql.Types.REAL))
    case ShortType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case ByteType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case BinaryType => Some(JdbcType("BINARY", java.sql.Types.BINARY))
    case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE))
    case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
    //      case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))
    case t: DecimalType => Some(JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL))
    case _ => throw new IllegalArgumentException(s"Don't know how to save ${dt.json} to JDBC")
  }
}

JdbcDialects.registerDialect(SQLServerDialect)
Ben Jarman
fonte
Você pode me ajudar a implementar o mesmo código em Java? e como registrar o customJdbcDialect no DataFrame
abhijitcaps
Bom, fiz o mesmo com o Vertica, mas desde o spark 2.1. JDbcUtil, você precisa implementar apenas o tipo de dados específico necessário. . dialect.getJDBCType (dt) .orElse (getCommonJDBCType (dt)) getOrElse (throw new IllegalArgumentException (s "Não é possível obter o tipo JDBC para $ {dt.simpleString}"))
Arnon Rodman
6

Gere um conjunto de dados simples contendo cinco valores e converta intpara o stringtipo:

val df = spark.range(5).select( col("id").cast("string") )
user8106134
fonte
6

Eu acho que isso é muito mais legível para mim.

import org.apache.spark.sql.types._
df.withColumn("year", df("year").cast(IntegerType))

Isso converterá a coluna do ano para IntegerTypea criação de colunas temporárias e a remoção dessas colunas. Se você deseja converter para qualquer outro tipo de dados, pode verificar os tipos dentro do org.apache.spark.sql.typespacote.

Piyush Patel
fonte
5

as respostas que sugerem o uso de elenco, FYI, o método de elenco no spark 1.4.1 estão quebradas.

por exemplo, um quadro de dados com uma coluna de cadeia com o valor "8182175552014127960" quando convertido em bigint tem o valor "8182175552014128100"

    df.show
+-------------------+
|                  a|
+-------------------+
|8182175552014127960|
+-------------------+

    df.selectExpr("cast(a as bigint) a").show
+-------------------+
|                  a|
+-------------------+
|8182175552014128100|
+-------------------+

Tivemos que enfrentar muitos problemas antes de encontrar esse bug, porque tínhamos grandes colunas em produção.

sauraI3h
fonte
4
psst, atualize seu spark
msemelman
2
@msemelman é ridículo ter que atualizar para uma nova versão do spark em produção por um pequeno bug.
SauraI3h
nem sempre atualizamos tudo para pequenos bugs? :)
caesarsol
5
df.select($"long_col".cast(IntegerType).as("int_col"))
soulmachine
fonte
4

Usando o Spark Sql 2.4.0, você pode fazer isso:

spark.sql("SELECT STRING(NULLIF(column,'')) as column_string")
Eric Bellet
fonte
3

Você pode usar o código abaixo.

df.withColumn("year", df("year").cast(IntegerType))

O qual converterá a coluna do ano em IntegerTypecoluna.

adarsh
fonte
2

Este método descartará a coluna antiga e criará novas colunas com os mesmos valores e novo tipo de dados. Meus tipos de dados originais quando o DataFrame foi criado foram: -

root
 |-- id: integer (nullable = true)
 |-- flag1: string (nullable = true)
 |-- flag2: string (nullable = true)
 |-- name: string (nullable = true)
 |-- flag3: string (nullable = true)

Depois disso, executei o seguinte código para alterar o tipo de dados: -

df=df.withColumnRenamed(<old column name>,<dummy column>) // This was done for both flag1 and flag3
df=df.withColumn(<old column name>,df.col(<dummy column>).cast(<datatype>)).drop(<dummy column>)

Depois disso, meu resultado passou a ser: -

root
 |-- id: integer (nullable = true)
 |-- flag2: string (nullable = true)
 |-- name: string (nullable = true)
 |-- flag1: boolean (nullable = true)
 |-- flag3: boolean (nullable = true)
PirateJack
fonte
Você poderia fornecer sua solução aqui.
Ajay Kharade
1

Pode-se alterar o tipo de dados de uma coluna usando cast no spark sql. nome da tabela é tabela e possui apenas duas colunas, o tipo de dados coluna1 e coluna2 e coluna1 deve ser alterado. ex-spark.sql ("selecione conversão (coluna1 como dupla) column1NewName, coluna2 da tabela") No lugar de dupla, escreva seu tipo de dados.

Tejasvi Sharma
fonte
1

Caso você precise renomear dezenas de colunas fornecidas pelo nome, o exemplo a seguir adota a abordagem de @dnlbrky e a aplica a várias colunas ao mesmo tempo:

df.selectExpr(df.columns.map(cn => {
    if (Set("speed", "weight", "height").contains(cn)) s"cast($cn as double) as $cn"
    else if (Set("isActive", "hasDevice").contains(cn)) s"cast($cn as boolean) as $cn"
    else cn
}):_*)

As colunas não transmitidas são mantidas inalteradas. Todas as colunas permanecem na ordem original.

alface cúbica
fonte
1

Tantas respostas e poucas explicações completas

A sintaxe a seguir funciona Usando o Databricks Notebook com Spark 2.4

from pyspark.sql.functions import *
df = df.withColumn("COL_NAME", to_date(BLDFm["LOAD_DATE"], "MM-dd-yyyy"))

Observe que você precisa especificar o formato de entrada que possui (no meu caso "MM-dd-aaaa") e a importação é obrigatória, pois o to_date é uma função spark sql

Também tentei essa sintaxe, mas obtive nulos em vez de uma conversão adequada:

df = df.withColumn("COL_NAME", df["COL_NAME"].cast("Date"))

(Observe que eu tive que usar colchetes e aspas para que ela seja sintaxicamente correta)


PS: Eu tenho que admitir que isso é como uma selva de sintaxe, existem muitas maneiras possíveis de pontos de entrada e as referências oficiais da API não têm exemplos adequados.

Mehdi LAMRANI
fonte
1
Selva de sintaxe. Sim. Este é o mundo do Spark agora.
Conner.xyz 5/11
1

Outra solução é a seguinte:

1) Mantenha "inferSchema" como False

2) Ao executar as funções 'Map' na linha, você pode ler 'asString' (row.getString ...)

//Read CSV and create dataset
Dataset<Row> enginesDataSet = sparkSession
            .read()
            .format("com.databricks.spark.csv")
            .option("header", "true")
            .option("inferSchema","false")
            .load(args[0]);

JavaRDD<Box> vertices = enginesDataSet
            .select("BOX","BOX_CD")
            .toJavaRDD()
            .map(new Function<Row, Box>() {
                @Override
                public Box call(Row row) throws Exception {
                    return new Box((String)row.getString(0),(String)row.get(1));
                }
            });
Vibha
fonte
0
    val fact_df = df.select($"data"(30) as "TopicTypeId", $"data"(31) as "TopicId",$"data"(21).cast(FloatType).as( "Data_Value_Std_Err")).rdd
    //Schema to be applied to the table
    val fact_schema = (new StructType).add("TopicTypeId", StringType).add("TopicId", StringType).add("Data_Value_Std_Err", FloatType)

    val fact_table = sqlContext.createDataFrame(fact_df, fact_schema).dropDuplicates()
Aravind Krishnakumar
fonte
0

Outra maneira:

// Generate a simple dataset containing five values and convert int to string type

val df = spark.range(5).select( col("id").cast("string")).withColumnRenamed("id","value")
user8106134
fonte
0

Caso você queira alterar várias colunas de um tipo específico para outro sem especificar nomes de colunas individuais

/* Get names of all columns that you want to change type. 
In this example I want to change all columns of type Array to String*/
    val arrColsNames = originalDataFrame.schema.fields.filter(f => f.dataType.isInstanceOf[ArrayType]).map(_.name)

//iterate columns you want to change type and cast to the required type
val updatedDataFrame = arrColsNames.foldLeft(originalDataFrame){(tempDF, colName) => tempDF.withColumn(colName, tempDF.col(colName).cast(DataTypes.StringType))}

//display

updatedDataFrame.show(truncate = false)
Ravi
fonte