Quando o cache expirou para um RDD no pyspark?

10

Usamos .cache()no RDD para cache persistente de um conjunto de dados. Minha preocupação é quando esse cache será expirado?

dt = sc.parallelize([2, 3, 4, 5, 6])
dt.cache()
Krishna Prasad
fonte

Respostas:

12

Ele não expirará até que o Spark esteja sem memória; nesse ponto, removerá os RDDs do cache que são usados ​​com menos frequência. Quando você pede algo que foi desanexado, ele recalcula o pipeline e o coloca em cache novamente. Se isso for muito caro, desista dos outros RDDs, não os armazene em cache em primeiro lugar nem os persista no sistema de arquivos.

Jan van der Vegt
fonte
6

Além da resposta de Jan, gostaria de salientar que o armazenamento RDD serializado (/ cache) funciona muito melhor do que o cache RDD normal para grandes conjuntos de dados .

Também ajuda a otimizar a coleta de lixo, no caso de grandes conjuntos de dados.

Além disso, nos documentos do Spark:

Quando seus objetos ainda são grandes demais para serem armazenados com eficiência, apesar desse ajuste, uma maneira muito mais simples de reduzir o uso de memória é armazená-los em formato serializado, usando StorageLevels serializados na API de persistência RDD, como MEMORY_ONLY_SER. O Spark armazenará cada partição RDD como uma grande matriz de bytes. A única desvantagem de armazenar dados na forma serializada é o tempo de acesso mais lento, devido à necessidade de desserializar cada objeto em tempo real. É altamente recomendável usar o Kryo se você deseja armazenar em cache dados em formato serializado, pois isso leva a tamanhos muito menores que a serialização Java (e certamente a objetos Java brutos).

Dawny33
fonte
Apenas uma observação: MEMORY_ONLY_SERestá disponível apenas no Scala / Java, não no Python.
Def_Os
1

O Spark irá persistir / limpar automaticamente o RDD ou o Dataframe se o RDD não for mais usado. Para verificar se um RDD está armazenado em cache, verifique a interface do usuário do Spark e verifique a guia Armazenamento e verifique os detalhes da memória.

No terminal, você pode usar rdd.unpersist()ou sqlContext.uncacheTable("sparktable")remover o RDD ou tabelas da Memória. Spark feito para Lazy Evaluation, a menos que e até que você diga alguma ação, ele não carrega ou processa nenhum dado no RDD ou no DataFrame.

KayV
fonte