Como salvar o DataFrame diretamente no Hive?

85

É possível salvar DataFrameno Spark diretamente no Hive?

Eu tentei com a conversão DataFramepara Rdde, em seguida, salvar como um arquivo de texto e, em seguida, carregando no colmeia. Mas estou me perguntando se posso salvar diretamente dataframepara a colmeia

Gourav
fonte

Respostas:

118

Você pode criar uma tabela temporária na memória e armazená-los na tabela hive usando sqlContext.

Vamos dizer que seu quadro de dados é myDf. Você pode criar uma tabela temporária usando,

myDf.createOrReplaceTempView("mytempTable") 

Em seguida, você pode usar uma instrução hive simples para criar uma tabela e despejar os dados de sua tabela temporária.

sqlContext.sql("create table mytable as select * from mytempTable");
Vinay Kumar
fonte
2
isso contornou os erros de leitura de parquete que eu estava recebendo ao usar write.saveAsTable no spark 2.0
ski_squaw
2
Sim. No entanto, podemos usar a partição por no quadro de dados antes de criar a tabela temporária. @chhantyal
Vinay Kumar
1
Como você conseguiu misturar e combinar a temporarymesa com a hivemesa? Ao fazer show tablesisso, inclui apenas as hivetabelas para minha spark 2.3.0instalação
StephenBoesch
1
esta tabela temporária será salva em seu contexto de colmeia e não pertence a tabelas de colmeia de forma alguma.
Vinay Kumar
1
hi @VinayKumar porque você diz "Se você está usando saveAsTable (é mais como persistir seu dataframe), você deve ter certeza de que tem memória suficiente alocada para seu aplicativo spark". você poderia explicar este ponto?
enneppi
28

Use DataFrameWriter.saveAsTable. ( df.write.saveAsTable(...)) Consulte o Guia do Spark SQL e DataFrame .

Daniel Darabos
fonte
4
saveAsTable não cria tabelas compatíveis com Hive. A melhor solução que encontrei é de Vinay Kumar.
RChat
@Jacek: Eu mesmo adicionei esta nota, porque acho que minha resposta está errada. Eu iria excluí-lo, mas ele é aceito. Você acha que a nota está errada?
Daniel Darabos
Sim. A nota estava errada e foi por isso que a removi. "Por favor, corrija-me se eu estiver errado" se aplica aqui :)
Jacek Laskowski
1
isso df.write().saveAsTable(tableName) também gravará dados de streaming na tabela?
user1870400
1
não, você não pode salvar dados de streaming com saveAsTable, nem mesmo está na api
Brian
21

Não vejo df.write.saveAsTable(...)obsoleto na documentação do Spark 2.0. Funcionou para nós no Amazon EMR. Fomos perfeitamente capazes de ler dados do S3 em um dataframe, processá-los, criar uma tabela a partir do resultado e lê-los com a MicroStrategy. A resposta de Vinays também funcionou.

Alex
fonte
5
Alguém sinalizou esta resposta como de baixa qualidade devido ao comprimento e conteúdo. Para ser honesto, provavelmente teria sido melhor como um comentário. Eu acho que já está no ar há dois anos e algumas pessoas acharam útil, então seria bom deixar as coisas como estão.
serakfalcon
Eu concordo, o comentário teria sido a melhor escolha. Lição aprendida :-)
Alex
15

você precisa ter / criar um HiveContext

import org.apache.spark.sql.hive.HiveContext;

HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());

Em seguida, salve diretamente o dataframe ou selecione as colunas para armazenar como tabela de colmeia

df é dataframe

df.write().mode("overwrite").saveAsTable("schemaName.tableName");

ou

df.select(df.col("col1"),df.col("col2"), df.col("col3")) .write().mode("overwrite").saveAsTable("schemaName.tableName");

ou

df.write().mode(SaveMode.Overwrite).saveAsTable("dbName.tableName");

SaveModes são Append / Ignore / Overwrite / ErrorIfExists

Eu adicionei aqui a definição para HiveContext da Documentação do Spark,

Além do SQLContext básico, você também pode criar um HiveContext, que fornece um superconjunto da funcionalidade fornecida pelo SQLContext básico. Recursos adicionais incluem a capacidade de escrever consultas usando o analisador HiveQL mais completo, acesso a UDFs Hive e a capacidade de ler dados de tabelas Hive. Para usar um HiveContext, você não precisa ter uma configuração de Hive existente e todas as fontes de dados disponíveis para um SQLContext ainda estão disponíveis. O HiveContext só é empacotado separadamente para evitar a inclusão de todas as dependências do Hive no build padrão do Spark.


no Spark versão 1.6.2, usar "dbName.tableName" dá este erro:

org.apache.spark.sql.AnalysisException: Especificar o nome do banco de dados ou outros qualificadores não são permitidos para tabelas temporárias. Se o nome da tabela tiver pontos (.), Indique o nome da tabela com crostas () .`

Anandkumar
fonte
É o segundo comando: 'df.select (df.col ("col1"), df.col ("col2"), df.col ("col3")) .write (). Mode ("overwrite"). SaveAsTable ("schemaName.tableName"); ' exigir que as colunas selecionadas que você pretende substituir já existam na tabela? Então você tem a tabela existente e só sobrescreve as colunas 1,2,3 existentes com os novos dados de seu df no spark? isso é interpretado certo?
dieHellste
3
df.write().mode...precisa ser alterado paradf.write.mode...
usuário 923227
8

Salvar no Hive é apenas uma questão de usar o write()método do seu SQLContext:

df.write.saveAsTable(tableName)

Vejo https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/DataFrameWriter.html#saveAsTable(java.lang.String)

No Spark 2.2: use DataSet em vez de DataFrame.

Raktotpal Bordoloi
fonte
Parece que há um erro que afirma que Jó abortou. Tentei o seguinte código pyspark_df.write.mode ("overwrite"). SaveAsTable ("InjuryTab2")
Sade
Oi! porque isso? From Spark 2.2: use DataSet instead DataFrame.
onofricamila
3

Desculpe escrever tarde para o post, mas não vejo nenhuma resposta aceita.

df.write().saveAsTableirá lançar AnalysisExceptione não é compatível com a tabela HIVE.

Armazenar DF df.write().format("hive")deve resolver!

Porém, se isso não funcionar, indo pelos comentários e respostas anteriores, esta é a melhor solução na minha opinião (no entanto, aberto a sugestões).

A melhor abordagem é criar explicitamente a tabela HIVE (incluindo a tabela PARTITIONED),

def createHiveTable: Unit ={
spark.sql("CREATE TABLE $hive_table_name($fields) " +
  "PARTITIONED BY ($partition_column String) STORED AS $StorageType")
}

salvar DF ​​como tabela temporária,

df.createOrReplaceTempView("$tempTableName")

e insira na tabela PARTITIONED HIVE:

spark.sql("insert into table default.$hive_table_name PARTITION($partition_column) select * from $tempTableName")
spark.sql("select * from default.$hive_table_name").show(1000,false)

Claro que a ÚLTIMA COLUNA em DF será a COLUNA DE PARTIÇÃO então crie a tabela HIVE de acordo!

Por favor, comente se funciona! ou não.


--ATUALIZAR--

df.write()
  .partitionBy("$partition_column")
  .format("hive")
  .mode(SaveMode.append)
  .saveAsTable($new_table_name_to_be_created_in_hive)  //Table should not exist OR should be a PARTITIONED table in HIVE
DrthSprk_
fonte
1

Aqui está a versão do PySpark para criar uma mesa Hive a partir do arquivo parquet. Você pode ter gerado arquivos Parquet usando o esquema inferido e agora deseja enviar a definição para o metastore do Hive. Você também pode enviar definição para o sistema como AWS Glue ou AWS Athena e não apenas para Hive metastore. Aqui estou usando spark.sql para enviar / criar tabela permanente.

   # Location where my parquet files are present.
    df = spark.read.parquet("s3://my-location/data/")
    cols = df.dtypes
    buf = []
    buf.append('CREATE EXTERNAL TABLE test123 (')
    keyanddatatypes =  df.dtypes
    sizeof = len(df.dtypes)
    print ("size----------",sizeof)
    count=1;
    for eachvalue in keyanddatatypes:
        print count,sizeof,eachvalue
        if count == sizeof:
            total = str(eachvalue[0])+str(' ')+str(eachvalue[1])
        else:
            total = str(eachvalue[0]) + str(' ') + str(eachvalue[1]) + str(',')
        buf.append(total)
        count = count + 1

    buf.append(' )')
    buf.append(' STORED as parquet ')
    buf.append("LOCATION")
    buf.append("'")
    buf.append('s3://my-location/data/')
    buf.append("'")
    buf.append("'")
    ##partition by pt
    tabledef = ''.join(buf)

    print "---------print definition ---------"
    print tabledef
    ## create a table using spark.sql. Assuming you are using spark 2.1+
    spark.sql(tabledef);
kartik
fonte
1

Para tabelas externas do Hive, uso esta função no PySpark:

def save_table(sparkSession, dataframe, database, table_name, save_format="PARQUET"):
    print("Saving result in {}.{}".format(database, table_name))
    output_schema = "," \
        .join(["{} {}".format(x.name.lower(), x.dataType) for x in list(dataframe.schema)]) \
        .replace("StringType", "STRING") \
        .replace("IntegerType", "INT") \
        .replace("DateType", "DATE") \
        .replace("LongType", "INT") \
        .replace("TimestampType", "INT") \
        .replace("BooleanType", "BOOLEAN") \
        .replace("FloatType", "FLOAT")\
        .replace("DoubleType","FLOAT")
    output_schema = re.sub(r'DecimalType[(][0-9]+,[0-9]+[)]', 'FLOAT', output_schema)

    sparkSession.sql("DROP TABLE IF EXISTS {}.{}".format(database, table_name))

    query = "CREATE EXTERNAL TABLE IF NOT EXISTS {}.{} ({}) STORED AS {} LOCATION '/user/hive/{}/{}'" \
        .format(database, table_name, output_schema, save_format, database, table_name)
    sparkSession.sql(query)
    dataframe.write.insertInto('{}.{}'.format(database, table_name),overwrite = True)
Shadowtrooper
fonte
1

No meu caso, isso funciona bem:

from pyspark_llap import HiveWarehouseSession
hive = HiveWarehouseSession.session(spark).build()
hive.setDatabase("DatabaseName")
df = spark.read.format("csv").option("Header",True).load("/user/csvlocation.csv")
df.write.format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).option("table",<tablename>).save()

Feito!!

Você pode ler os dados, deixá-lo dar como "funcionário"

hive.executeQuery("select * from Employee").show()

Para obter mais detalhes, use este URL: https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.5/integrating-hive/content/hive-read-write-operations.html

MD Rijwan
fonte
0

Você poderia usar a biblioteca spark-llap da Hortonworks assim

import com.hortonworks.hwc.HiveWarehouseSession

df.write
  .format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector")
  .mode("append")
  .option("table", "myDatabase.myTable")
  .save()
Mike
fonte
-1

Se você deseja criar uma tabela de colmeia (que não existe) a partir de um dataframe (às vezes falha na criação DataFrameWriter.saveAsTable) StructType.toDDLwill ajuda a listar as colunas como uma string.

val df = ...

val schemaStr = df.schema.toDDL # This gives the columns 
spark.sql(s"""create table hive_table ( ${schemaStr})""")

//Now write the dataframe to the table
df.write.saveAsTable("hive_table")

hive_tableserá criado no espaço padrão, pois não fornecemos nenhum banco de dados em spark.sql(). stg.hive_tablepode ser usado para criar hive_tableno stgbanco de dados.

Srsrinivas
fonte
Exemplo detalhado encontrado aqui: stackoverflow.com/a/56833395/1592191
mrsrinivas