(Por que) precisamos chamar cache ou persistir em um RDD

171

Quando um conjunto de dados distribuído resiliente (RDD) é criado a partir de um arquivo ou coleção de texto (ou de outro RDD), precisamos chamar "cache" ou "persistir" explicitamente para armazenar os dados do RDD na memória? Ou os dados RDD são armazenados de forma distribuída na memória por padrão?

val textFile = sc.textFile("/user/emp.txt")

De acordo com meu entendimento, após a etapa acima, o textFile é um RDD e está disponível em toda / parte da memória do nó.

Nesse caso, por que precisamos chamar "cache" ou "persistir" no textFile RDD?

Ramana
fonte

Respostas:

300

A maioria das operações de RDD é preguiçosa. Pense em um RDD como uma descrição de uma série de operações. Um RDD não é um dado. Então esta linha:

val textFile = sc.textFile("/user/emp.txt")

Não faz nada. Ele cria um RDD que diz "precisaremos carregar este arquivo". O arquivo não está carregado neste momento.

As operações de RDD que exigem a observação do conteúdo dos dados não podem ser preguiçosas. (Essas são chamadas de ações .) Um exemplo é RDD.count- para informar o número de linhas no arquivo, o arquivo precisa ser lido. Portanto, se você escrever textFile.count, nesse ponto o arquivo será lido, as linhas serão contadas e a contagem será retornada.

E se você ligar de textFile.countnovo? A mesma coisa: o arquivo será lido e contado novamente. Nada é armazenado. Um RDD não é um dado.

Então o que RDD.cachefaz? Se você adicionar textFile.cacheao código acima:

val textFile = sc.textFile("/user/emp.txt")
textFile.cache

Não faz nada. RDD.cachetambém é uma operação preguiçosa. O arquivo ainda não foi lido. Mas agora o RDD diz "leia este arquivo e depois armazene em cache o conteúdo". Se você executar textFile.countpela primeira vez, o arquivo será carregado, armazenado em cache e contado. Se você ligar textFile.countpela segunda vez, a operação usará o cache. Ele apenas pega os dados do cache e conta as linhas.

O comportamento do cache depende da memória disponível. Se o arquivo não couber na memória, por exemplo, ele textFile.countretornará ao comportamento usual e relerá o arquivo.

Daniel Darabos
fonte
4
Olá Daniel, - quando você chama cache, isso significa que o RDD não é recarregado da fonte (por exemplo, arquivo de texto) - como você pode ter certeza de que os dados do arquivo de texto são mais recentes quando armazenados em cache? (faz figura faísca isso ou é uma operação manual para unpersist () periodicamente para garantir os dados de origem fica recalculada posteriormente na linhagem?)
andrew.butkus
Além disso - se você precisar não persistir periodicamente, - se você tiver um RDD armazenado em cache, dependente de outro RDD que está armazenado em cache, deverá remover o dispersão de ambos os RDDs para ver resultados recomputados?
andrew.butkus
21
O Spark apenas assume que o arquivo nunca será alterado. Ele lê o arquivo em um ponto arbitrário no tempo e pode reler partes dele conforme necessário posteriormente. (Por exemplo, se uma parte dos dados foi removida do cache.) É melhor manter os arquivos inalterados! Basta criar um novo arquivo com um novo nome quando tiver novos dados e carregá-lo como um novo RDD. Se você estiver obtendo novos dados continuamente, consulte o Spark Streaming.
Daniel Darabos
10
Sim. Os RDDs são imutáveis, portanto, todo RDD assume que suas dependências também são imutáveis. O Spark Streaming permite que você configure essas árvores que operam em um fluxo de alterações. Mas uma solução ainda mais simples é construir a árvore em uma função que usa um nome de arquivo como parâmetro. Em seguida, basta chamar a função para o novo arquivo e poof, você terá a nova árvore de computação.
Daniel Darabos
1
@ Humoyun: Na guia Storage da Spark UI, você pode ver quanto de cada RDD é armazenado em cache. Os dados podem ser tão grandes que apenas 40% deles cabem na memória total que você possui para armazenar em cache. Uma opção nesse caso é usar perisiste escolher uma opção de armazenamento que permita derramar os dados do cache em disco.
DanielDarabos
197

Eu acho que a pergunta seria melhor formulada como:

Quando precisamos chamar o cache ou persistir em um RDD?

Os processos de faísca são preguiçosos, ou seja, nada acontecerá até que seja necessário. Para responder rapidamente à pergunta, após a val textFile = sc.textFile("/user/emp.txt")emissão, nada acontece com os dados, apenas a HadoopRDDé construído, usando o arquivo como fonte.

Digamos que transformamos esses dados um pouco:

val wordsRDD = textFile.flatMap(line => line.split("\\W"))

Novamente, nada acontece com os dados. Agora, há um novo RDD wordsRDDque contém uma referência testFilee uma função a serem aplicadas quando necessário.

Somente quando uma ação é chamada em um RDD, como wordsRDD.counta cadeia RDD, chamada linhagem , será executada. Ou seja, os dados, divididos em partições, serão carregados pelos executores do cluster Spark, a flatMapfunção será aplicada e o resultado será calculado.

Em uma linhagem linear, como a deste exemplo, cache()não é necessário. Os dados serão carregados para os executores, todas as transformações serão aplicadas e, finalmente count, serão computadas, todas na memória - se os dados couberem na memória.

cacheé útil quando a linhagem do RDD se ramifica. Digamos que você queira filtrar as palavras do exemplo anterior em uma contagem de palavras positivas e negativas. Você poderia fazer isso assim:

val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

Aqui, cada ramificação emite uma recarga dos dados. A adição de uma cachedeclaração explícita garantirá que o processamento feito anteriormente seja preservado e reutilizado. O trabalho terá a seguinte aparência:

val textFile = sc.textFile("/user/emp.txt")
val wordsRDD = textFile.flatMap(line => line.split("\\W"))
wordsRDD.cache()
val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

Por esse motivo, cacheé dito que 'quebra a linhagem', pois cria um ponto de verificação que pode ser reutilizado para processamento adicional.

Regra geral: use cachequando a linhagem do seu RDD se ramifica ou quando um RDD é usado várias vezes como em um loop.

maasg
fonte
1
Impressionante. Obrigado. Mais uma pergunta relacionada. Quando armazenamos em cache ou persistimos, os dados serão armazenados na memória do executor ou na memória do nó do trabalhador. Se for a memória do executor, o How Spark identifica qual executor possui os dados.
Ramana
1
@RamanaUppala a memória do executor é usada. A fração da memória do executor usada para armazenamento em cache é controlada pela configuração spark.storage.memoryFraction. Com relação a qual executor possui quais dados, um RDD manterá o controle de suas partições que são distribuídas nos executores.
maasg
5
@maasg me corrija se eu estiver errado, mas nem cachenem persist pode quebrar a linhagem .
zero323
Onde as palavras RDD seriam armazenadas se não tivéssemos a instrução .cache () no exemplo acima?
sun_dare
e se antes das duas contagens, unirmos as duas ramificações de volta a uma primeira e contarmos? Nesse caso, o cache é benéfico?
Xiawei Zhang
30

Precisamos chamar "cache" ou "persistir" explicitamente para armazenar os dados RDD na memória?

Sim, apenas se necessário.

Os dados RDD armazenados de forma distribuída na memória por padrão?

Não!

E estas são as razões pelas quais:

  • O Spark suporta dois tipos de variáveis ​​compartilhadas: variáveis ​​de broadcast, que podem ser usadas para armazenar em cache um valor na memória em todos os nós, e acumuladores, que são variáveis ​​que são apenas "adicionadas" a, como contadores e somas.

  • Os RDDs suportam dois tipos de operações: transformações, que criam um novo conjunto de dados a partir de um já existente, e ações, que retornam um valor ao programa do driver após executar um cálculo no conjunto de dados. Por exemplo, map é uma transformação que passa cada elemento do conjunto de dados por uma função e retorna um novo RDD representando os resultados. Por outro lado, reduzir é uma ação que agrega todos os elementos do RDD usando alguma função e retorna o resultado final ao programa do driver (embora também exista um reduzirByKey paralelo que retorne um conjunto de dados distribuído).

  • Todas as transformações no Spark são preguiçosas, pois não calculam seus resultados imediatamente. Em vez disso, eles apenas lembram as transformações aplicadas a algum conjunto de dados base (por exemplo, um arquivo). As transformações são computadas apenas quando uma ação exige que um resultado seja retornado ao programa do driver. Esse design permite que o Spark seja executado com mais eficiência - por exemplo, podemos perceber que um conjunto de dados criado pelo mapa será usado em uma redução e retornará apenas o resultado da redução ao driver, em vez do conjunto de dados mapeado maior.

  • Por padrão, cada RDD transformado pode ser recalculado toda vez que você executa uma ação nele. No entanto, você também pode persistir um RDD na memória usando o método persist (ou cache); nesse caso, o Spark manterá os elementos no cluster para um acesso muito mais rápido na próxima vez que você o consultar. Também há suporte para RDDs persistentes no disco ou replicados em vários nós.

Para mais detalhes, consulte o guia de programação do Spark .

eliasah
fonte
1
Isso não respondeu à minha pergunta.
Ramana
O que não responde?
11135 eliasah
1
quando os dados do RDD são armazenados na memória padrão, por que precisamos chamar Cache ou Persist?
Ramana
Do RDD não são armazenados na memória por padrão, de modo que persiste o RDD faz faísca executar a transformação mais rápida no cluster
eliasah
2
É uma boa resposta, não sei por que foi rebaixado. É uma resposta de cima para baixo, explicando como os RDDs funcionam a partir dos conceitos de alto nível. Eu adicionei outra resposta que vai de baixo para cima: começando com "o que essa linha faz". Talvez seja mais fácil seguir alguém que está começando com o Spark.
Daniel Delabos
11

Abaixo estão as três situações em que você deve armazenar em cache seus RDDs:

usando um RDD muitas vezes

executando várias ações no mesmo RDD

para longas cadeias de transformações (ou muito caras)

rileyss
fonte
7

Adicionando outro motivo para adicionar (ou adicionar temporariamente) a cachechamada de método.

para problemas de memória de depuração

Com o cachemétodo, o spark fornecerá informações de depuração relacionadas ao tamanho do RDD. portanto, na interface do usuário integrada do spark, você obterá informações sobre o consumo de memória RDD. e isso se mostrou muito útil no diagnóstico de problemas de memória.

zinking
fonte