Gostaria de ler um CSV no spark e convertê-lo como DataFrame e armazená-lo no HDFS com df.registerTempTable("table_name")
Eu tentei:
scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")
Erro que recebi:
java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Qual é o comando certo para carregar o arquivo CSV como DataFrame no Apache Spark?
scala
apache-spark
hadoop
apache-spark-sql
hdfs
Donbeo
fonte
fonte
Respostas:
O spark-csv faz parte da funcionalidade principal do Spark e não requer uma biblioteca separada. Então você poderia fazer, por exemplo
Em scala, (isso funciona para qualquer menção de delimitador de formato "," para csv, "\ t" para tsv etc.)
val df = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter", ",") .load("csvfile.csv")
fonte
Analisar CSV e carregar como DataFrame / DataSet com Spark 2.x
Primeiro, inicialize o
SparkSession
objeto por padrão, ele estará disponível em shells comospark
1. Faça isso de maneira programática
Atualização: adicionando todas as opções daqui caso o link seja quebrado no futuro
2. Você também pode fazer isso da maneira SQL
Dependências :
Versão Spark <2.0
Dependências:
fonte
spark-core_2.11
espark-sql_2.11
da2.0.1
versão está bem. Se possível, adicione a mensagem de erro.spark.read.format("csv").option("delimiter ", "|") ...
programmatic way
é deixar de fora.format("csv")
e substituir.load(...
por.csv(...
. Ooption
método pertence à classe DataFrameReader conforme retornada peloread
método, onde os métodosload
ecsv
retornam um dataframe, portanto, não é possível ter opções marcadas após serem chamadas. Esta resposta é bastante completa, mas você deve vincular a documentação para que as pessoas possam ver todas as outras opções de CSV disponíveis spark.apache.org/docs/latest/api/scala/… *): org.apache.spark.sql.DataFrameÉ para quem o Hadoop é 2.6 e o Spark é 1.6 e sem o pacote "databricks".
fonte
Com o Spark 2.0, a seguir é como você pode ler CSV
fonte
spark.read.csv(path)
espark.read.format("csv").load(path)
?Em Java 1.8 Este trecho de código funciona perfeitamente para ler arquivos CSV
POM.xml
Java
fonte
Existem muitos desafios para analisar um arquivo CSV, ele continua aumentando se o tamanho do arquivo for maior, se houver caracteres em inglês / escape / separator / outros nos valores da coluna, que podem causar erros de análise.
A mágica está nas opções usadas. Os que funcionaram para mim e espero que devam cobrir a maioria dos casos extremos estão no código abaixo:
Espero que ajude. Para mais informações, consulte: Usando PySpark 2 para ler CSV com código-fonte HTML
Nota: O código acima é da API do Spark 2, onde a API de leitura de arquivo CSV é fornecida com pacotes integrados do Spark instalável.
Nota: PySpark é um wrapper Python para Spark e compartilha a mesma API que Scala / Java.
fonte
O exemplo do Penny's Spark 2 é a maneira de fazê-lo no spark2. Há mais um truque: tenha esse cabeçalho gerado para você fazendo uma varredura inicial dos dados, definindo a opção
inferSchema
comotrue
Aqui, supondo que
spark
seja uma sessão spark que você configurou, está a operação a carregar no arquivo de índice CSV de todas as imagens do Landsat hospedadas pela Amazon no S3.A má notícia é: isso desencadeia uma varredura no arquivo; para algo grande como esse arquivo CSV compactado com mais de 20 MB, que pode levar 30 segundos em uma conexão de longo curso. Lembre-se: é melhor codificar manualmente o esquema depois que ele chegar.
(trecho de código Apache Software License 2.0 licenciado para evitar toda ambiguidade; algo que fiz como teste de demonstração / integração da integração com o S3)
fonte
Caso você esteja construindo um jar com o scala 2.11 e o Apache 2.0 ou superior.
Não há necessidade de criar um
sqlContext
ousparkContext
objeto. Apenas umSparkSession
objeto é suficiente para todas as necessidades.A seguir está o meu código, que funciona bem:
No caso de você estar executando no cluster, mude
.master("local")
para.master("yarn")
enquanto define osparkBuilder
objetoO Spark Doc cobre isso: https://spark.apache.org/docs/2.2.0/sql-programming-guide.html
fonte
Adicione as seguintes dependências do Spark ao arquivo POM:
// Configuração do Spark:
val spark = SparkSession.builder (). master ("local"). appName ("Sample App"). getOrCreate ()
// Leia o arquivo csv:
val df = spark.read.option ("cabeçalho", "true"). csv ("FILE_PATH")
// Exibir saída
df.show ()
fonte
Para ler do caminho relativo no sistema, use o método System.getProperty para obter o diretório atual e outros usos para carregar o arquivo usando o caminho relativo.
faísca: 2.4.4 scala: 2.11.12
fonte
Com o Spark 2.4+, se você deseja carregar um csv de um diretório local, pode usar 2 sessões e carregá-lo no hive. A primeira sessão deve ser criada com a configuração master () como "local [*]" e a segunda sessão com "yarn" e o Hive habilitado.
O abaixo funcionou para mim.
Quando correu com
spark2-submit --master "yarn" --conf spark.ui.enabled=false testCSV.jar
ele correu bem e criou a tabela na colméia.fonte
O formato de arquivo padrão é Parquet com spark.read .. e a leitura de arquivo csv é por isso que você está recebendo a exceção. Especifique o formato csv com a API que você está tentando usar
fonte
Tente isso se estiver usando o spark 2.0+
Nota: - este trabalho para qualquer arquivo delimitado. Basta usar a opção (“delimitador”) para alterar o valor.
Espero que isso seja útil.
fonte
Com o Spark csv incorporado, você pode fazer isso facilmente com o novo objeto SparkSession para Spark> 2.0.
Existem várias opções que você pode definir.
header
: se seu arquivo inclui a linha do cabeçalho na parte superiorinferSchema
: se você deseja inferir o esquema automaticamente ou não. O padrão étrue
. Eu sempre prefiro fornecer esquema para garantir tipos de dados adequados.mode
: modo de análise, PERMISSIVE, DROPMALFORMED ou FAILFASTdelimiter
: para especificar delimitador, o padrão é vírgula (',')fonte