É possível salvar DataFrame
no Spark diretamente no Hive?
Eu tentei com a conversão DataFrame
para Rdd
e, em seguida, salvar como um arquivo de texto e, em seguida, carregando no colmeia. Mas estou me perguntando se posso salvar diretamente dataframe
para a colmeia
scala
apache-spark
hive
apache-spark-sql
Gourav
fonte
fonte
temporary
mesa com ahive
mesa? Ao fazershow tables
isso, inclui apenas ashive
tabelas para minhaspark 2.3.0
instalaçãoUse
DataFrameWriter.saveAsTable
. (df.write.saveAsTable(...)
) Consulte o Guia do Spark SQL e DataFrame .fonte
df.write().saveAsTable(tableName)
também gravará dados de streaming na tabela?Não vejo
df.write.saveAsTable(...)
obsoleto na documentação do Spark 2.0. Funcionou para nós no Amazon EMR. Fomos perfeitamente capazes de ler dados do S3 em um dataframe, processá-los, criar uma tabela a partir do resultado e lê-los com a MicroStrategy. A resposta de Vinays também funcionou.fonte
você precisa ter / criar um HiveContext
import org.apache.spark.sql.hive.HiveContext; HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());
Em seguida, salve diretamente o dataframe ou selecione as colunas para armazenar como tabela de colmeia
df é dataframe
df.write().mode("overwrite").saveAsTable("schemaName.tableName");
ou
df.select(df.col("col1"),df.col("col2"), df.col("col3")) .write().mode("overwrite").saveAsTable("schemaName.tableName");
ou
df.write().mode(SaveMode.Overwrite).saveAsTable("dbName.tableName");
SaveModes são Append / Ignore / Overwrite / ErrorIfExists
Eu adicionei aqui a definição para HiveContext da Documentação do Spark,
Além do SQLContext básico, você também pode criar um HiveContext, que fornece um superconjunto da funcionalidade fornecida pelo SQLContext básico. Recursos adicionais incluem a capacidade de escrever consultas usando o analisador HiveQL mais completo, acesso a UDFs Hive e a capacidade de ler dados de tabelas Hive. Para usar um HiveContext, você não precisa ter uma configuração de Hive existente e todas as fontes de dados disponíveis para um SQLContext ainda estão disponíveis. O HiveContext só é empacotado separadamente para evitar a inclusão de todas as dependências do Hive no build padrão do Spark.
no Spark versão 1.6.2, usar "dbName.tableName" dá este erro:
fonte
df.write().mode...
precisa ser alterado paradf.write.mode...
Salvar no Hive é apenas uma questão de usar o
write()
método do seu SQLContext:Vejo https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/DataFrameWriter.html#saveAsTable(java.lang.String)
No Spark 2.2: use DataSet em vez de DataFrame.
fonte
From Spark 2.2: use DataSet instead DataFrame.
Desculpe escrever tarde para o post, mas não vejo nenhuma resposta aceita.
df.write().saveAsTable
irá lançarAnalysisException
e não é compatível com a tabela HIVE.Armazenar DF
df.write().format("hive")
deve resolver!Porém, se isso não funcionar, indo pelos comentários e respostas anteriores, esta é a melhor solução na minha opinião (no entanto, aberto a sugestões).
A melhor abordagem é criar explicitamente a tabela HIVE (incluindo a tabela PARTITIONED),
def createHiveTable: Unit ={ spark.sql("CREATE TABLE $hive_table_name($fields) " + "PARTITIONED BY ($partition_column String) STORED AS $StorageType") }
salvar DF como tabela temporária,
df.createOrReplaceTempView("$tempTableName")
e insira na tabela PARTITIONED HIVE:
spark.sql("insert into table default.$hive_table_name PARTITION($partition_column) select * from $tempTableName") spark.sql("select * from default.$hive_table_name").show(1000,false)
Claro que a ÚLTIMA COLUNA em DF será a COLUNA DE PARTIÇÃO então crie a tabela HIVE de acordo!
Por favor, comente se funciona! ou não.
--ATUALIZAR--
df.write() .partitionBy("$partition_column") .format("hive") .mode(SaveMode.append) .saveAsTable($new_table_name_to_be_created_in_hive) //Table should not exist OR should be a PARTITIONED table in HIVE
fonte
Aqui está a versão do PySpark para criar uma mesa Hive a partir do arquivo parquet. Você pode ter gerado arquivos Parquet usando o esquema inferido e agora deseja enviar a definição para o metastore do Hive. Você também pode enviar definição para o sistema como AWS Glue ou AWS Athena e não apenas para Hive metastore. Aqui estou usando spark.sql para enviar / criar tabela permanente.
# Location where my parquet files are present. df = spark.read.parquet("s3://my-location/data/") cols = df.dtypes buf = [] buf.append('CREATE EXTERNAL TABLE test123 (') keyanddatatypes = df.dtypes sizeof = len(df.dtypes) print ("size----------",sizeof) count=1; for eachvalue in keyanddatatypes: print count,sizeof,eachvalue if count == sizeof: total = str(eachvalue[0])+str(' ')+str(eachvalue[1]) else: total = str(eachvalue[0]) + str(' ') + str(eachvalue[1]) + str(',') buf.append(total) count = count + 1 buf.append(' )') buf.append(' STORED as parquet ') buf.append("LOCATION") buf.append("'") buf.append('s3://my-location/data/') buf.append("'") buf.append("'") ##partition by pt tabledef = ''.join(buf) print "---------print definition ---------" print tabledef ## create a table using spark.sql. Assuming you are using spark 2.1+ spark.sql(tabledef);
fonte
Para tabelas externas do Hive, uso esta função no PySpark:
def save_table(sparkSession, dataframe, database, table_name, save_format="PARQUET"): print("Saving result in {}.{}".format(database, table_name)) output_schema = "," \ .join(["{} {}".format(x.name.lower(), x.dataType) for x in list(dataframe.schema)]) \ .replace("StringType", "STRING") \ .replace("IntegerType", "INT") \ .replace("DateType", "DATE") \ .replace("LongType", "INT") \ .replace("TimestampType", "INT") \ .replace("BooleanType", "BOOLEAN") \ .replace("FloatType", "FLOAT")\ .replace("DoubleType","FLOAT") output_schema = re.sub(r'DecimalType[(][0-9]+,[0-9]+[)]', 'FLOAT', output_schema) sparkSession.sql("DROP TABLE IF EXISTS {}.{}".format(database, table_name)) query = "CREATE EXTERNAL TABLE IF NOT EXISTS {}.{} ({}) STORED AS {} LOCATION '/user/hive/{}/{}'" \ .format(database, table_name, output_schema, save_format, database, table_name) sparkSession.sql(query) dataframe.write.insertInto('{}.{}'.format(database, table_name),overwrite = True)
fonte
No meu caso, isso funciona bem:
from pyspark_llap import HiveWarehouseSession hive = HiveWarehouseSession.session(spark).build() hive.setDatabase("DatabaseName") df = spark.read.format("csv").option("Header",True).load("/user/csvlocation.csv") df.write.format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).option("table",<tablename>).save()
Feito!!
Você pode ler os dados, deixá-lo dar como "funcionário"
hive.executeQuery("select * from Employee").show()
Para obter mais detalhes, use este URL: https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.5/integrating-hive/content/hive-read-write-operations.html
fonte
Você poderia usar a biblioteca spark-llap da Hortonworks assim
import com.hortonworks.hwc.HiveWarehouseSession df.write .format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector") .mode("append") .option("table", "myDatabase.myTable") .save()
fonte
val df = ... val schemaStr = df.schema.toDDL # This gives the columns spark.sql(s"""create table hive_table ( ${schemaStr})""") //Now write the dataframe to the table df.write.saveAsTable("hive_table")
hive_table
será criado no espaço padrão, pois não fornecemos nenhum banco de dados emspark.sql()
.stg.hive_table
pode ser usado para criarhive_table
nostg
banco de dados.fonte