Grave um único arquivo CSV usando spark-csv

Respostas:

168

Está criando uma pasta com vários arquivos, pois cada partição é salva individualmente. Se precisar de um único arquivo de saída (ainda em uma pasta), você pode repartition(de preferência se os dados upstream forem grandes, mas exigem uma ordem aleatória):

df
   .repartition(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

ou coalesce:

df
   .coalesce(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

quadro de dados antes de salvar:

Todos os dados serão gravados mydata.csv/part-00000. Antes de usar esta opção, certifique-se de entender o que está acontecendo e qual é o custo de transferir todos os dados para um único trabalhador . Se você usar o sistema de arquivos distribuído com replicação, os dados serão transferidos várias vezes - primeiro buscados em um único trabalhador e posteriormente distribuídos pelos nós de armazenamento.

Como alternativa, você pode deixar seu código como está e usar ferramentas de uso geral como catou HDFSgetmerge para simplesmente mesclar todas as partes posteriormente.

zero323
fonte
6
você também pode usar o coalesce: df.coalesce (1) .write.format ("com.databricks.spark.csv") .option ("header", "true") .save ("mydata.csv")
ravi
O spark 1.6 gera um erro quando definimos .coalesce(1)que diz alguma FileNotFoundException no diretório _temporary. Ainda é um bug no Spark
Harsha
@Harsha Improvável. Em vez disso, um simples resultado de coalesce(1)ser muito caro e geralmente não prático.
zero323
Concordo @ zero323, mas se você tiver um requisito especial para consolidar em um arquivo, ainda deve ser possível, visto que você tem recursos e tempo suficientes.
Harsha
2
@Harsha Não digo que não. Se você ajustar o GC corretamente, deve funcionar bem, mas é simplesmente uma perda de tempo e provavelmente prejudicará o desempenho geral. Então, pessoalmente, não vejo nenhum motivo para me preocupar, especialmente porque é trivialmente simples mesclar arquivos fora do Spark sem me preocupar com o uso de memória.
zero323
36

Se você estiver executando o Spark com HDFS, resolvi o problema gravando arquivos csv normalmente e aproveitando o HDFS para fazer a fusão. Estou fazendo isso diretamente no Spark (1.6):

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

def merge(srcPath: String, dstPath: String): Unit =  {
   val hadoopConfig = new Configuration()
   val hdfs = FileSystem.get(hadoopConfig)
   FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
   // the "true" setting deletes the source files once they are merged into the new output
}


val newData = << create your dataframe >>


val outputfile = "/user/feeds/project/outputs/subject"  
var filename = "myinsights"
var outputFileName = outputfile + "/temp_" + filename 
var mergedFileName = outputfile + "/merged_" + filename
var mergeFindGlob  = outputFileName

    newData.write
        .format("com.databricks.spark.csv")
        .option("header", "false")
        .mode("overwrite")
        .save(outputFileName)
    merge(mergeFindGlob, mergedFileName )
    newData.unpersist()

Não me lembro onde aprendi esse truque, mas pode funcionar para você.

Minkymorgan
fonte
Eu não tentei - e suspeito que pode não ser simples.
Minkymorgan
1
Obrigado. Eu adicionei uma resposta que funciona em Databricks
Josiah Yoder
@Minkymorgan eu tenho um problema semelhante, mas não é capaz de fazê-lo corretamente .. Você pode olhar para esta questão stackoverflow.com/questions/46812388/…
SUDARSHAN
4
@SUDARSHAN Minha função acima funciona com dados não compactados. No seu exemplo, acho que você está usando a compactação gzip enquanto grava os arquivos - e depois - tentando mesclá-los, o que falha. Isso não vai funcionar, já que você não pode mesclar arquivos gzip. Gzip não é um algoritmo de compactação divisível, então certamente não é "mesclável". Você pode testar a compactação "rápida" ou "bz2" - mas a sensação é que isso irá falhar também na fusão. Provavelmente, o melhor é remover a compactação, mesclar os arquivos brutos e, a seguir, compactar usando um codec divisível.
Minkymorgan
e se eu quiser preservar o cabeçalho? ele duplica para cada parte do arquivo
Normal
32

Posso estar um pouco atrasado para o jogo aqui, mas usar coalesce(1)ou repartition(1)pode funcionar para pequenos conjuntos de dados, mas grandes conjuntos de dados seriam todos colocados em uma partição em um nó. Isso provavelmente gerará erros OOM ou, na melhor das hipóteses, processará lentamente.

Eu sugiro que você use a FileUtil.copyMerge()função da API do Hadoop. Isso irá mesclar as saídas em um único arquivo.

EDITAR - Isso efetivamente traz os dados para o driver em vez de para um nó executor. Coalesce()seria ótimo se um único executor tivesse mais RAM para uso do que o driver.

EDIT 2 : copyMerge()está sendo removido no Hadoop 3.0. Consulte o seguinte artigo de estouro de pilha para obter mais informações sobre como trabalhar com a versão mais recente: Como fazer CopyMerge no Hadoop 3.0?

etspaceman
fonte
Alguma ideia de como obter um csv com uma linha de cabeçalho dessa maneira? Não gostaria que o arquivo produzisse um cabeçalho, pois isso intercalaria os cabeçalhos por todo o arquivo, um para cada partição.
nojo
Há uma opção que usei no passado documentada aqui: markhneedham.com/blog/2014/11/30/…
etspaceman
@etspaceman Cool. Ainda não tenho uma boa maneira de fazer isso, infelizmente, pois preciso ser capaz de fazer isso em Java (ou Spark, mas de uma forma que não consuma muita memória e possa trabalhar com arquivos grandes) . Eu ainda não consigo acreditar que eles removeram esta chamada de API ... este é um uso muito comum, mesmo que não seja exatamente usado por outros aplicativos no ecossistema Hadoop.
woot
20

Se você estiver usando Databricks e puder colocar todos os dados na RAM em um trabalhador (e, portanto, pode usar .coalesce(1)), pode usar dbfs para localizar e mover o arquivo CSV resultante:

val fileprefix= "/mnt/aws/path/file-prefix"

dataset
  .coalesce(1)       
  .write             
//.mode("overwrite") // I usually don't use this, but you may want to.
  .option("header", "true")
  .option("delimiter","\t")
  .csv(fileprefix+".tmp")

val partition_path = dbutils.fs.ls(fileprefix+".tmp/")
     .filter(file=>file.name.endsWith(".csv"))(0).path

dbutils.fs.cp(partition_path,fileprefix+".tab")

dbutils.fs.rm(fileprefix+".tmp",recurse=true)

Se o seu arquivo não couber na RAM do trabalhador, você pode considerar a sugestão do chaotic3quilibrium de usar FileUtils.copyMerge () . Não fiz isso e ainda não sei se é possível ou não, por exemplo, no S3.

Esta resposta é construída em respostas anteriores a esta pergunta, bem como meus próprios testes do trecho de código fornecido. Eu originalmente postei no Databricks e estou republicando-o aqui.

A melhor documentação para a opção recursiva do dbfs rm que encontrei está em um fórum do Databricks .

Josiah Yoder
fonte
3

Uma solução que funciona para S3 modificado de Minkymorgan.

Simplesmente passe o caminho do diretório particionado temporário (com nome diferente do caminho final) como o srcPathcsv / txt final único como destPath Especifique também deleteSourcese quiser remover o diretório original.

/**
* Merges multiple partitions of spark text file output into single file. 
* @param srcPath source directory of partitioned files
* @param dstPath output path of individual path
* @param deleteSource whether or not to delete source directory after merging
* @param spark sparkSession
*/
def mergeTextFiles(srcPath: String, dstPath: String, deleteSource: Boolean): Unit =  {
  import org.apache.hadoop.fs.FileUtil
  import java.net.URI
  val config = spark.sparkContext.hadoopConfiguration
  val fs: FileSystem = FileSystem.get(new URI(srcPath), config)
  FileUtil.copyMerge(
    fs, new Path(srcPath), fs, new Path(dstPath), deleteSource, config, null
  )
}
John Zhu
fonte
A implementação de copyMerge lista todos os arquivos e itera sobre eles, isso não é seguro no s3. se você gravar seus arquivos e depois listá-los - isso não garante que todos eles serão listados. veja [isto | docs.aws.amazon.com/AmazonS3/latest/dev/…
LiranBo
3

A df.write()API do spark criará arquivos de várias partes dentro de determinado caminho ... para forçar o uso df.coalesce(1).write.csv(...)do Spark a escrever apenas um arquivo de parte em vez de df.repartition(1).write.csv(...)como coalescer é uma transformação estreita, enquanto que reparticionar é uma transformação ampla, consulte Spark - repartition () vs coalesce ()

df.coalesce(1).write.csv(filepath,header=True) 

irá criar a pasta em determinado caminho de arquivo com um part-0001-...-c000.csvuso de arquivo

cat filepath/part-0001-...-c000.csv > filename_you_want.csv 

ter um nome de arquivo amigável

pprasad009
fonte
alternativamente, se o dataframe não for muito grande (~ GBs ou caber na memória do driver), você também pode usar df.toPandas().to_csv(path)isso para gravar csv único com seu nome de arquivo preferido
pprasad009
1
Ugh, é tão frustrante como isso só pode ser feito convertendo-se em pandas. Quão difícil é apenas escrever um arquivo sem algum UUID nele?
ijoseph
2

reparticionar / coalescer em 1 partição antes de salvar (você ainda obteria uma pasta, mas teria um arquivo de parte nela)

Arnon Rotem-Gal-Oz
fonte
2

você pode usar rdd.coalesce(1, true).saveAsTextFile(path)

ele armazenará dados como um arquivo único no caminho / parte-00000

Gourav
fonte
1
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql.{DataFrame,SaveMode,SparkSession}
import org.apache.spark.sql.functions._

Resolvi usando a abordagem abaixo (hdfs renomear nome de arquivo): -

Etapa 1: - (Criar quadro de dados e gravar no HDFS)

df.coalesce(1).write.format("csv").option("header", "false").mode(SaveMode.Overwrite).save("/hdfsfolder/blah/")

Etapa 2: - (Criar configuração do Hadoop)

val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)

Etapa 3: - (Obter caminho no caminho da pasta hdfs)

val pathFiles = new Path("/hdfsfolder/blah/")

Passo 4: - (Obtenha os nomes dos arquivos spark da pasta hdfs)

val fileNames = hdfs.listFiles(pathFiles, false)
println(fileNames)

setp5: - (crie uma lista mutável do scala para salvar todos os nomes de arquivo e adicioná-lo à lista)

    var fileNamesList = scala.collection.mutable.MutableList[String]()
    while (fileNames.hasNext) {
      fileNamesList += fileNames.next().getPath.getName
    }
    println(fileNamesList)

Etapa 6: - (filtrar a ordem do arquivo _SUCESS da lista de scala de nomes de arquivo)

    // get files name which are not _SUCCESS
    val partFileName = fileNamesList.filterNot(filenames => filenames == "_SUCCESS")

passo 7: - (converter a lista scala em string e adicionar o nome do arquivo desejado à string da pasta hdfs e então aplicar a renomeação)

val partFileSourcePath = new Path("/yourhdfsfolder/"+ partFileName.mkString(""))
    val desiredCsvTargetPath = new Path(/yourhdfsfolder/+ "op_"+ ".csv")
    hdfs.rename(partFileSourcePath , desiredCsvTargetPath)
Sri Hari Kali Charan Tummala
fonte
1

Estou usando isso em Python para obter um único arquivo:

df.toPandas().to_csv("/tmp/my.csv", sep=',', header=True, index=False)
Kees C. Bakker
fonte
1

Esta resposta expande a resposta aceita, fornece mais contexto e fornece trechos de código que você pode executar no Spark Shell em sua máquina.

Mais contexto na resposta aceita

A resposta aceita pode dar a impressão de que o código de amostra gera um único mydata.csvarquivo, mas não é esse o caso. Vamos demonstrar:

val df = Seq("one", "two", "three").toDF("num")
df
  .repartition(1)
  .write.csv(sys.env("HOME")+ "/Documents/tmp/mydata.csv")

Aqui está o que é gerado:

Documents/
  tmp/
    mydata.csv/
      _SUCCESS
      part-00000-b3700504-e58b-4552-880b-e7b52c60157e-c000.csv

NB mydata.csvé uma pasta na resposta aceita - não é um arquivo!

Como gerar um único arquivo com um nome específico

Podemos usar o spark-daria para escrever um único mydata.csvarquivo.

import com.github.mrpowers.spark.daria.sql.DariaWriters
DariaWriters.writeSingleFile(
    df = df,
    format = "csv",
    sc = spark.sparkContext,
    tmpFolder = sys.env("HOME") + "/Documents/better/staging",
    filename = sys.env("HOME") + "/Documents/better/mydata.csv"
)

Isso produzirá o arquivo da seguinte maneira:

Documents/
  better/
    mydata.csv

Caminhos S3

Você precisará passar caminhos s3a DariaWriters.writeSingleFilepara usar este método em S3:

DariaWriters.writeSingleFile(
    df = df,
    format = "csv",
    sc = spark.sparkContext,
    tmpFolder = "s3a://bucket/data/src",
    filename = "s3a://bucket/data/dest/my_cool_file.csv"
)

Veja aqui para mais informações.

Evitando copyMerge

copyMerge foi removido do Hadoop 3. A DariaWriters.writeSingleFileimplementação usa fs.rename, conforme descrito aqui . O Spark 3 ainda usa o Hadoop 2 , então as implementações de copyMerge funcionarão em 2020. Não tenho certeza de quando o Spark atualizará para o Hadoop 3, mas é melhor evitar qualquer abordagem de copyMerge que fará seu código quebrar quando o Spark atualizar o Hadoop.

Código fonte

Procure o DariaWritersobjeto no código-fonte do spark-daria se quiser inspecionar a implementação.

Implementação PySpark

É mais fácil gravar um único arquivo com o PySpark porque você pode converter o DataFrame em um Pandas DataFrame que é gravado como um único arquivo por padrão.

from pathlib import Path
home = str(Path.home())
data = [
    ("jellyfish", "JALYF"),
    ("li", "L"),
    ("luisa", "LAS"),
    (None, None)
]
df = spark.createDataFrame(data, ["word", "expected"])
df.toPandas().to_csv(home + "/Documents/tmp/mydata-from-pyspark.csv", sep=',', header=True, index=False)

Limitações

A DariaWriters.writeSingleFileabordagem Scala e a df.toPandas()abordagem Python funcionam apenas para pequenos conjuntos de dados. Grandes conjuntos de dados não podem ser gravados como arquivos únicos. Gravar dados como um único arquivo não é ideal de uma perspectiva de desempenho porque os dados não podem ser gravados em paralelo.

Poderes
fonte
0

usando Listbuffer, podemos salvar dados em um único arquivo:

import java.io.FileWriter
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ListBuffer
    val text = spark.read.textFile("filepath")
    var data = ListBuffer[String]()
    for(line:String <- text.collect()){
      data += line
    }
    val writer = new FileWriter("filepath")
    data.foreach(line => writer.write(line.toString+"\n"))
    writer.close()
siddhu salvi
fonte
-2

Existe mais uma maneira de usar Java

import java.io._

def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit) 
  {
     val p = new java.io.PrintWriter(f);  
     try { op(p) } 
     finally { p.close() }
  } 

printToFile(new File("C:/TEMP/df.csv")) { p => df.collect().foreach(p.println)}
Sergio Alyoshkin
fonte
nome 'true' não está definido
Arron