Como sobrescrever o diretório de saída no spark

107

Eu tenho um aplicativo de streaming de Spark que produz um conjunto de dados para cada minuto. Preciso salvar / sobrescrever os resultados dos dados processados.

Quando tentei sobrescrever o dataset org.apache.hadoop.mapred.FileAlreadyExistsException para a execução.

Eu defino a propriedade Spark set("spark.files.overwrite","true"), mas não dá sorte.

Como sobrescrever ou Predelete os arquivos do Spark?

Vijay Innamuri
fonte
1
Sim, é uma merda, não é, eu considero uma regressão para 0.9.0. Aceite minha resposta :)
samthebest
set("spark.files.overwrite","true")funciona apenas para arquivos adicionados por meio despark.addFile()
aiman

Respostas:

106

ATUALIZAÇÃO: Sugira o uso Dataframes, além de algo parecido ... .write.mode(SaveMode.Overwrite) ....

Cafetão prático:

implicit class PimpedStringRDD(rdd: RDD[String]) {
    def write(p: String)(implicit ss: SparkSession): Unit = {
      import ss.implicits._
      rdd.toDF().as[String].write.mode(SaveMode.Overwrite).text(p)
    }
  }

Para versões mais antigas, tente

yourSparkConf.set("spark.hadoop.validateOutputSpecs", "false")
val sc = SparkContext(yourSparkConf)

Em 1.1.0 você pode definir as configurações de configuração usando o script spark-submit com a sinalização --conf.

AVISO (versões anteriores): De acordo com @piggybox, há um bug no Spark onde ele apenas sobrescreverá os arquivos necessários para gravá-los part-, todos os outros arquivos serão deixados sem remoção.

o melhor
fonte
29
Para Spark 1.4:df.write.mode(SaveMode.Overwrite).parquet(path)
Ha Pham
Para Spark SQL, você tem opções para definir o SaveMode para Core Spark, você não tem nada parecido. Gostaria muito de algum desse tipo de recurso para saveAsTextFile e outras transformações
Murtaza Kanchwala
3
Um problema oculto: comparando com a solução de @pzecevic de limpar a pasta inteira por meio do HDFS, nesta abordagem o Spark substituirá apenas os arquivos de parte com o mesmo nome de arquivo na pasta de saída. Isso funciona na maioria das vezes, mas se houver algo mais, como arquivos de partes extras de outro trabalho do Spark / Hadoop na pasta, isso não substituirá esses arquivos.
cofrinho
6
Você também pode usar o df.write.mode(mode: String).parquet(path)modo Where: String pode ser: "overwrite", "append", "ignore", "error".
centeio
1
@avocado Sim, acho que as APIs do Spark ficam cada vez piores a cada lançamento: P
samthebest
27

A documentação para o parâmetro spark.files.overwrite diz o seguinte: "Substituir arquivos adicionados por meio deSparkContext.addFile() quando o arquivo de destino existe e seu conteúdo não corresponde ao da fonte." Portanto, não tem efeito no método saveAsTextFiles.

Você pode fazer isso antes de salvar o arquivo:

val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf)
try { hdfs.delete(new org.apache.hadoop.fs.Path(filepath), true) } catch { case _ : Throwable => { } }

Aas explicado aqui: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696. html

pzecevic
fonte
29
que tal para o pyspark?
javadba
A próxima resposta para usar 'write.mode (SaveMode.Overwrite)' é o caminho a percorrer
YaOg
O hdfs pode excluir os novos arquivos à medida que chegam, pois ainda está excluindo os antigos.
Jake
25

Na documentação pyspark.sql.DataFrame.save (atualmente em 1.3.1), você pode especificar mode='overwrite'ao salvar um DataFrame:

myDataFrame.save(path='myPath', source='parquet', mode='overwrite')

Eu verifiquei que isso removerá até mesmo arquivos de partição restantes. Portanto, se você tinha, digamos, 10 partições / arquivos originalmente, mas substituiu a pasta com um DataFrame que tinha apenas 6 partições, a pasta resultante terá as 6 partições / arquivos.

Consulte a documentação do Spark SQL para obter mais informações sobre as opções de modo.

dnlbrky
fonte
2
Verdadeiro e útil, obrigado, mas uma solução específica de DataFrame - spark.hadoop.validateOutputSpecsfuncionará em todas as APIs do Spark.
samthebest
Por algum motivo, spark.hadoop.validateOutputSpecsnão funcionou para mim no 1.3, mas funciona.
Eric Walker,
1
@samthebest Com a save(... , mode=rota, você pode sobrescrever um conjunto de arquivos, anexar outro, etc. dentro do mesmo contexto do Spark. Não o spark.hadoop.validateOutputSpecslimitaria a apenas um modo por contexto?
dnlbrky
1
@dnlbrky O OP não pediu para anexar. Como eu disse, verdadeiro, útil, mas desnecessário. Se o OP perguntasse "como faço para anexar", uma ampla gama de respostas poderia ser fornecida. Mas não vamos entrar nisso. Também aconselho você a usar a versão Scala dos DataFrames, pois tem segurança de tipo e mais verificação - por exemplo, se você tivesse um erro de digitação em "sobrescrever", não descobriria até que o DAG fosse avaliado - o que em um trabalho de Big Data poderia 2 horas depois !! Se você usar a versão Scala, o compilador verificará tudo antecipadamente! Muito legal e muito importante para Big Data.
samthebest
15

df.write.mode('overwrite').parquet("/output/folder/path")funciona se você deseja sobrescrever um arquivo parquet usando python. Isso está no faísca 1.6.2. API pode ser diferente em versões posteriores

akn
fonte
Sim, isso funciona muito bem para o meu requisito (Databricks)
Nick.McDermaid
4
  val jobName = "WordCount";
  //overwrite the output directory in spark  set("spark.hadoop.validateOutputSpecs", "false")
  val conf = new 
  SparkConf().setAppName(jobName).set("spark.hadoop.validateOutputSpecs", "false");
  val sc = new SparkContext(conf)
vaquar khan
fonte
Apenas para Spark 1, na versão mais recente, usedf.write.mode(SaveMode.Overwrite)
ChikuMiku
3

Esta versão sobrecarregada do save função funciona para mim:

yourDF.save (outputPath, org.apache.spark.sql.SaveMode.valueOf ("Overwrite"))

O exemplo acima substituiria uma pasta existente. O savemode pode usar esses parâmetros também ( https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html ):

Acrescentar : o modo Anexar significa que, ao salvar um DataFrame em uma fonte de dados, se os dados / tabela já existirem, o conteúdo do DataFrame deve ser anexado aos dados existentes.

ErrorIfExists : o modo ErrorIfExists significa que, ao salvar um DataFrame em uma fonte de dados, se os dados já existirem, espera-se que uma exceção seja lançada.

Ignorar : o modo Ignorar significa que ao salvar um DataFrame em uma fonte de dados, se os dados já existirem, espera-se que a operação de salvamento não salve o conteúdo do DataFrame e não altere os dados existentes.

Shay
fonte
1

Se você estiver disposto a usar seu próprio formato de saída personalizado, também poderá obter o comportamento desejado com RDD.

Dê uma olhada nas seguintes classes: FileOutputFormat , FileOutputCommitter

No formato de saída de arquivo, você tem um método chamado checkOutputSpecs, que verifica se o diretório de saída existe. Em FileOutputCommitter você tem o commitJob, que normalmente está transferindo dados do diretório temporário para seu local final.

Eu não fui capaz de verificar ainda (faria isso, assim que tivesse alguns minutos livres), mas teoricamente: Se eu estender FileOutputFormat e substituir checkOutputSpecs para um método que não lança exceção no diretório já existe, e ajustar o O método commitJob do meu committer de saída personalizado para executar qualquer lógica que eu quiser (por exemplo, substituir alguns dos arquivos, anexar outros) do que eu posso conseguir atingir o comportamento desejado com RDDs também.

O formato de saída é passado para: saveAsNewAPIHadoopFile (que é o método saveAsTextFile chamado também para realmente salvar os arquivos). E o committer de saída é configurado no nível do aplicativo.

Michael Kopaniov
fonte
Eu evitaria chegar perto de criar uma subclasse de FileOutputCommitter se você puder evitar: esse é um código assustador. O Hadoop 3.0 adiciona um ponto de plug-in onde FileOutputFormat pode ter diferentes implementações de uma superclasse refatorada (PathOutputCommitter). O S3 da Netflix gravará no local em uma árvore particionada, fazendo apenas a resolução de conflitos (falhar, excluir, adicionar) na confirmação do trabalho e apenas nas partições atualizadas
nível