Está criando uma pasta com vários arquivos, pois cada partição é salva individualmente. Se precisar de um único arquivo de saída (ainda em uma pasta), você pode repartition
(de preferência se os dados upstream forem grandes, mas exigem uma ordem aleatória):
df
.repartition(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("mydata.csv")
ou coalesce
:
df
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("mydata.csv")
quadro de dados antes de salvar:
Todos os dados serão gravados mydata.csv/part-00000
. Antes de usar esta opção, certifique-se de entender o que está acontecendo e qual é o custo de transferir todos os dados para um único trabalhador . Se você usar o sistema de arquivos distribuído com replicação, os dados serão transferidos várias vezes - primeiro buscados em um único trabalhador e posteriormente distribuídos pelos nós de armazenamento.
Como alternativa, você pode deixar seu código como está e usar ferramentas de uso geral como cat
ou HDFSgetmerge
para simplesmente mesclar todas as partes posteriormente.
.coalesce(1)
que diz alguma FileNotFoundException no diretório _temporary. Ainda é um bug no Sparkcoalesce(1)
ser muito caro e geralmente não prático.Se você estiver executando o Spark com HDFS, resolvi o problema gravando arquivos csv normalmente e aproveitando o HDFS para fazer a fusão. Estou fazendo isso diretamente no Spark (1.6):
Não me lembro onde aprendi esse truque, mas pode funcionar para você.
fonte
Posso estar um pouco atrasado para o jogo aqui, mas usar
coalesce(1)
ourepartition(1)
pode funcionar para pequenos conjuntos de dados, mas grandes conjuntos de dados seriam todos colocados em uma partição em um nó. Isso provavelmente gerará erros OOM ou, na melhor das hipóteses, processará lentamente.Eu sugiro que você use a
FileUtil.copyMerge()
função da API do Hadoop. Isso irá mesclar as saídas em um único arquivo.EDITAR - Isso efetivamente traz os dados para o driver em vez de para um nó executor.
Coalesce()
seria ótimo se um único executor tivesse mais RAM para uso do que o driver.EDIT 2 :
copyMerge()
está sendo removido no Hadoop 3.0. Consulte o seguinte artigo de estouro de pilha para obter mais informações sobre como trabalhar com a versão mais recente: Como fazer CopyMerge no Hadoop 3.0?fonte
Se você estiver usando Databricks e puder colocar todos os dados na RAM em um trabalhador (e, portanto, pode usar
.coalesce(1)
), pode usar dbfs para localizar e mover o arquivo CSV resultante:Se o seu arquivo não couber na RAM do trabalhador, você pode considerar a sugestão do chaotic3quilibrium de usar FileUtils.copyMerge () . Não fiz isso e ainda não sei se é possível ou não, por exemplo, no S3.
Esta resposta é construída em respostas anteriores a esta pergunta, bem como meus próprios testes do trecho de código fornecido. Eu originalmente postei no Databricks e estou republicando-o aqui.
A melhor documentação para a opção recursiva do dbfs rm que encontrei está em um fórum do Databricks .
fonte
Uma solução que funciona para S3 modificado de Minkymorgan.
Simplesmente passe o caminho do diretório particionado temporário (com nome diferente do caminho final) como o
srcPath
csv / txt final único comodestPath
Especifique tambémdeleteSource
se quiser remover o diretório original.fonte
A
df.write()
API do spark criará arquivos de várias partes dentro de determinado caminho ... para forçar o usodf.coalesce(1).write.csv(...)
do Spark a escrever apenas um arquivo de parte em vez dedf.repartition(1).write.csv(...)
como coalescer é uma transformação estreita, enquanto que reparticionar é uma transformação ampla, consulte Spark - repartition () vs coalesce ()irá criar a pasta em determinado caminho de arquivo com um
part-0001-...-c000.csv
uso de arquivoter um nome de arquivo amigável
fonte
df.toPandas().to_csv(path)
isso para gravar csv único com seu nome de arquivo preferidoreparticionar / coalescer em 1 partição antes de salvar (você ainda obteria uma pasta, mas teria um arquivo de parte nela)
fonte
você pode usar
rdd.coalesce(1, true).saveAsTextFile(path)
ele armazenará dados como um arquivo único no caminho / parte-00000
fonte
Resolvi usando a abordagem abaixo (hdfs renomear nome de arquivo): -
Etapa 1: - (Criar quadro de dados e gravar no HDFS)
Etapa 2: - (Criar configuração do Hadoop)
Etapa 3: - (Obter caminho no caminho da pasta hdfs)
Passo 4: - (Obtenha os nomes dos arquivos spark da pasta hdfs)
setp5: - (crie uma lista mutável do scala para salvar todos os nomes de arquivo e adicioná-lo à lista)
Etapa 6: - (filtrar a ordem do arquivo _SUCESS da lista de scala de nomes de arquivo)
passo 7: - (converter a lista scala em string e adicionar o nome do arquivo desejado à string da pasta hdfs e então aplicar a renomeação)
fonte
Estou usando isso em Python para obter um único arquivo:
fonte
Esta resposta expande a resposta aceita, fornece mais contexto e fornece trechos de código que você pode executar no Spark Shell em sua máquina.
Mais contexto na resposta aceita
A resposta aceita pode dar a impressão de que o código de amostra gera um único
mydata.csv
arquivo, mas não é esse o caso. Vamos demonstrar:Aqui está o que é gerado:
NB
mydata.csv
é uma pasta na resposta aceita - não é um arquivo!Como gerar um único arquivo com um nome específico
Podemos usar o spark-daria para escrever um único
mydata.csv
arquivo.Isso produzirá o arquivo da seguinte maneira:
Caminhos S3
Você precisará passar caminhos s3a
DariaWriters.writeSingleFile
para usar este método em S3:Veja aqui para mais informações.
Evitando copyMerge
copyMerge foi removido do Hadoop 3. A
DariaWriters.writeSingleFile
implementação usafs.rename
, conforme descrito aqui . O Spark 3 ainda usa o Hadoop 2 , então as implementações de copyMerge funcionarão em 2020. Não tenho certeza de quando o Spark atualizará para o Hadoop 3, mas é melhor evitar qualquer abordagem de copyMerge que fará seu código quebrar quando o Spark atualizar o Hadoop.Código fonte
Procure o
DariaWriters
objeto no código-fonte do spark-daria se quiser inspecionar a implementação.Implementação PySpark
É mais fácil gravar um único arquivo com o PySpark porque você pode converter o DataFrame em um Pandas DataFrame que é gravado como um único arquivo por padrão.
Limitações
A
DariaWriters.writeSingleFile
abordagem Scala e adf.toPandas()
abordagem Python funcionam apenas para pequenos conjuntos de dados. Grandes conjuntos de dados não podem ser gravados como arquivos únicos. Gravar dados como um único arquivo não é ideal de uma perspectiva de desempenho porque os dados não podem ser gravados em paralelo.fonte
usando Listbuffer, podemos salvar dados em um único arquivo:
fonte
Existe mais uma maneira de usar Java
fonte