Você não pode adicionar uma coluna arbitrária a um DataFrame
no Spark. Novas colunas podem ser criadas apenas usando literais (outros tipos literais são descritos em Como adicionar uma coluna constante em um Spark DataFrame? )
from pyspark.sql.functions import lit
df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()
## +---+---+-----+---+
## | x1| x2| x3| x4|
## +---+---+-----+---+
## | 1| a| 23.0| 0|
## | 3| B|-23.0| 0|
## +---+---+-----+---+
transformando uma coluna existente:
from pyspark.sql.functions import exp
df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()
## +---+---+-----+---+--------------------+
## | x1| x2| x3| x4| x5|
## +---+---+-----+---+--------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9|
## | 3| B|-23.0| 0|1.026187963170189...|
## +---+---+-----+---+--------------------+
incluído usando join
:
from pyspark.sql.functions import exp
lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
.join(lookup, col("x1") == col("k"), "leftouter")
.drop("k")
.withColumnRenamed("v", "x6"))
## +---+---+-----+---+--------------------+----+
## | x1| x2| x3| x4| x5| x6|
## +---+---+-----+---+--------------------+----+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|
## | 3| B|-23.0| 0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+
ou gerado com a função / udf:
from pyspark.sql.functions import rand
df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()
## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2| x3| x4| x5| x6| x7|
## +---+---+-----+---+--------------------+----+-------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|0.41930610446846617|
## | 3| B|-23.0| 0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+
As funções internas ( pyspark.sql.functions
), que são mapeadas para a expressão Catalyst, geralmente são preferidas às funções definidas pelo usuário do Python.
Se você deseja adicionar conteúdo de um RDD arbitrário como uma coluna, pode
Para adicionar uma coluna usando um UDF:
fonte
Para Spark 2.0
fonte
df = df.select('*', (df.age + 10).alias('agePlusTen'))
você está efetivamente adicionando uma coluna arbitrária como @ zero323 advertiu-nos em cima, era impossível, a menos que haja algo de errado em fazer isso em Spark, em Pandas é a maneira padrão ..df.select('*', df.age + 10, df.age + 20)
Existem várias maneiras de adicionar uma nova coluna no pySpark.
Vamos primeiro criar um DataFrame simples.
Agora vamos tentar dobrar o valor da coluna e armazená-lo em uma nova coluna. PFB algumas abordagens diferentes para alcançar o mesmo.
Para mais exemplos e explicações sobre as funções do Spark DataFrame, você pode visitar meu blog .
Eu espero que isso ajude.
fonte
Você pode definir um novo
udf
ao adicionar umcolumn_name
:fonte
fonte
StringType()
.Gostaria de oferecer um exemplo generalizado para um caso de uso muito semelhante:
Caso de Uso: Eu tenho um csv que consiste em:
Eu preciso realizar algumas transformações e o csv final precisa parecer
Preciso fazer isso porque esse é o esquema definido por algum modelo e preciso que meus dados finais sejam interoperáveis com as inserções em massa do SQL e outras coisas.
tão:
1) Li o csv original usando spark.read e chamo-o de "df".
2) Faço algo com os dados.
3) eu adiciono as colunas nulas usando este script:
Dessa forma, você pode estruturar seu esquema após carregar um csv (também funcionaria para reordenar colunas, se você precisar fazer isso para muitas tabelas).
fonte
A maneira mais simples de adicionar uma coluna é usar "withColumn". Como o dataframe é criado usando o sqlContext, é necessário especificar o esquema ou, por padrão, pode estar disponível no conjunto de dados. Se o esquema for especificado, a carga de trabalho se tornará tediosa ao mudar sempre.
Abaixo está um exemplo que você pode considerar:
fonte
Podemos adicionar colunas adicionais ao DataFrame diretamente com as etapas abaixo:
fonte