Eu tenho um aplicativo de streaming de Spark que produz um conjunto de dados para cada minuto. Preciso salvar / sobrescrever os resultados dos dados processados.
Quando tentei sobrescrever o dataset org.apache.hadoop.mapred.FileAlreadyExistsException para a execução.
Eu defino a propriedade Spark set("spark.files.overwrite","true")
, mas não dá sorte.
Como sobrescrever ou Predelete os arquivos do Spark?
apache-spark
Vijay Innamuri
fonte
fonte
set("spark.files.overwrite","true")
funciona apenas para arquivos adicionados por meio despark.addFile()
Respostas:
ATUALIZAÇÃO: Sugira o uso
Dataframes
, além de algo parecido... .write.mode(SaveMode.Overwrite) ...
.Cafetão prático:
Para versões mais antigas, tente
Em 1.1.0 você pode definir as configurações de configuração usando o script spark-submit com a sinalização --conf.
AVISO (versões anteriores): De acordo com @piggybox, há um bug no Spark onde ele apenas sobrescreverá os arquivos necessários para gravá-los
part-
, todos os outros arquivos serão deixados sem remoção.fonte
Spark 1.4
:df.write.mode(SaveMode.Overwrite).parquet(path)
df.write.mode(mode: String).parquet(path)
modo Where: String pode ser: "overwrite", "append", "ignore", "error".pois
df.save(path, source, mode)
está obsoleto, ( http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.DataFrame )usar
df.write.format(source).mode("overwrite").save(path)
onde df.write é DataFrameWriter
'source' pode ser ("com.databricks.spark.avro" | "parquet" | "json")
fonte
source
também pode sercsv
A documentação para o parâmetro
spark.files.overwrite
diz o seguinte: "Substituir arquivos adicionados por meio deSparkContext.addFile()
quando o arquivo de destino existe e seu conteúdo não corresponde ao da fonte." Portanto, não tem efeito no método saveAsTextFiles.Você pode fazer isso antes de salvar o arquivo:
Aas explicado aqui: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696. html
fonte
Na documentação pyspark.sql.DataFrame.save (atualmente em 1.3.1), você pode especificar
mode='overwrite'
ao salvar um DataFrame:Eu verifiquei que isso removerá até mesmo arquivos de partição restantes. Portanto, se você tinha, digamos, 10 partições / arquivos originalmente, mas substituiu a pasta com um DataFrame que tinha apenas 6 partições, a pasta resultante terá as 6 partições / arquivos.
Consulte a documentação do Spark SQL para obter mais informações sobre as opções de modo.
fonte
spark.hadoop.validateOutputSpecs
funcionará em todas as APIs do Spark.spark.hadoop.validateOutputSpecs
não funcionou para mim no 1.3, mas funciona.save(... , mode=
rota, você pode sobrescrever um conjunto de arquivos, anexar outro, etc. dentro do mesmo contexto do Spark. Não ospark.hadoop.validateOutputSpecs
limitaria a apenas um modo por contexto?df.write.mode('overwrite').parquet("/output/folder/path")
funciona se você deseja sobrescrever um arquivo parquet usando python. Isso está no faísca 1.6.2. API pode ser diferente em versões posterioresfonte
fonte
df.write.mode(SaveMode.Overwrite)
Esta versão sobrecarregada do save função funciona para mim:
yourDF.save (outputPath, org.apache.spark.sql.SaveMode.valueOf ("Overwrite"))
O exemplo acima substituiria uma pasta existente. O savemode pode usar esses parâmetros também ( https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html ):
Acrescentar : o modo Anexar significa que, ao salvar um DataFrame em uma fonte de dados, se os dados / tabela já existirem, o conteúdo do DataFrame deve ser anexado aos dados existentes.
ErrorIfExists : o modo ErrorIfExists significa que, ao salvar um DataFrame em uma fonte de dados, se os dados já existirem, espera-se que uma exceção seja lançada.
Ignorar : o modo Ignorar significa que ao salvar um DataFrame em uma fonte de dados, se os dados já existirem, espera-se que a operação de salvamento não salve o conteúdo do DataFrame e não altere os dados existentes.
fonte
Se você estiver disposto a usar seu próprio formato de saída personalizado, também poderá obter o comportamento desejado com RDD.
Dê uma olhada nas seguintes classes: FileOutputFormat , FileOutputCommitter
No formato de saída de arquivo, você tem um método chamado checkOutputSpecs, que verifica se o diretório de saída existe. Em FileOutputCommitter você tem o commitJob, que normalmente está transferindo dados do diretório temporário para seu local final.
Eu não fui capaz de verificar ainda (faria isso, assim que tivesse alguns minutos livres), mas teoricamente: Se eu estender FileOutputFormat e substituir checkOutputSpecs para um método que não lança exceção no diretório já existe, e ajustar o O método commitJob do meu committer de saída personalizado para executar qualquer lógica que eu quiser (por exemplo, substituir alguns dos arquivos, anexar outros) do que eu posso conseguir atingir o comportamento desejado com RDDs também.
O formato de saída é passado para: saveAsNewAPIHadoopFile (que é o método saveAsTextFile chamado também para realmente salvar os arquivos). E o committer de saída é configurado no nível do aplicativo.
fonte