A maioria das operações de RDD é preguiçosa. Pense em um RDD como uma descrição de uma série de operações. Um RDD não é um dado. Então esta linha:
val textFile = sc.textFile("/user/emp.txt")
Não faz nada. Ele cria um RDD que diz "precisaremos carregar este arquivo". O arquivo não está carregado neste momento.
As operações de RDD que exigem a observação do conteúdo dos dados não podem ser preguiçosas. (Essas são chamadas de ações .) Um exemplo é RDD.count
- para informar o número de linhas no arquivo, o arquivo precisa ser lido. Portanto, se você escrever textFile.count
, nesse ponto o arquivo será lido, as linhas serão contadas e a contagem será retornada.
E se você ligar de textFile.count
novo? A mesma coisa: o arquivo será lido e contado novamente. Nada é armazenado. Um RDD não é um dado.
Então o que RDD.cache
faz? Se você adicionar textFile.cache
ao código acima:
val textFile = sc.textFile("/user/emp.txt")
textFile.cache
Não faz nada. RDD.cache
também é uma operação preguiçosa. O arquivo ainda não foi lido. Mas agora o RDD diz "leia este arquivo e depois armazene em cache o conteúdo". Se você executar textFile.count
pela primeira vez, o arquivo será carregado, armazenado em cache e contado. Se você ligar textFile.count
pela segunda vez, a operação usará o cache. Ele apenas pega os dados do cache e conta as linhas.
O comportamento do cache depende da memória disponível. Se o arquivo não couber na memória, por exemplo, ele textFile.count
retornará ao comportamento usual e relerá o arquivo.
perisist
e escolher uma opção de armazenamento que permita derramar os dados do cache em disco.Eu acho que a pergunta seria melhor formulada como:
Quando precisamos chamar o cache ou persistir em um RDD?
Os processos de faísca são preguiçosos, ou seja, nada acontecerá até que seja necessário. Para responder rapidamente à pergunta, após a
val textFile = sc.textFile("/user/emp.txt")
emissão, nada acontece com os dados, apenas aHadoopRDD
é construído, usando o arquivo como fonte.Digamos que transformamos esses dados um pouco:
Novamente, nada acontece com os dados. Agora, há um novo RDD
wordsRDD
que contém uma referênciatestFile
e uma função a serem aplicadas quando necessário.Somente quando uma ação é chamada em um RDD, como
wordsRDD.count
a cadeia RDD, chamada linhagem , será executada. Ou seja, os dados, divididos em partições, serão carregados pelos executores do cluster Spark, aflatMap
função será aplicada e o resultado será calculado.Em uma linhagem linear, como a deste exemplo,
cache()
não é necessário. Os dados serão carregados para os executores, todas as transformações serão aplicadas e, finalmentecount
, serão computadas, todas na memória - se os dados couberem na memória.cache
é útil quando a linhagem do RDD se ramifica. Digamos que você queira filtrar as palavras do exemplo anterior em uma contagem de palavras positivas e negativas. Você poderia fazer isso assim:Aqui, cada ramificação emite uma recarga dos dados. A adição de uma
cache
declaração explícita garantirá que o processamento feito anteriormente seja preservado e reutilizado. O trabalho terá a seguinte aparência:Por esse motivo,
cache
é dito que 'quebra a linhagem', pois cria um ponto de verificação que pode ser reutilizado para processamento adicional.Regra geral: use
cache
quando a linhagem do seu RDD se ramifica ou quando um RDD é usado várias vezes como em um loop.fonte
spark.storage.memoryFraction
. Com relação a qual executor possui quais dados, um RDD manterá o controle de suas partições que são distribuídas nos executores.cache
nempersist
pode quebrar a linhagem .Precisamos chamar "cache" ou "persistir" explicitamente para armazenar os dados RDD na memória?
Sim, apenas se necessário.
Os dados RDD armazenados de forma distribuída na memória por padrão?
Não!
E estas são as razões pelas quais:
O Spark suporta dois tipos de variáveis compartilhadas: variáveis de broadcast, que podem ser usadas para armazenar em cache um valor na memória em todos os nós, e acumuladores, que são variáveis que são apenas "adicionadas" a, como contadores e somas.
Os RDDs suportam dois tipos de operações: transformações, que criam um novo conjunto de dados a partir de um já existente, e ações, que retornam um valor ao programa do driver após executar um cálculo no conjunto de dados. Por exemplo, map é uma transformação que passa cada elemento do conjunto de dados por uma função e retorna um novo RDD representando os resultados. Por outro lado, reduzir é uma ação que agrega todos os elementos do RDD usando alguma função e retorna o resultado final ao programa do driver (embora também exista um reduzirByKey paralelo que retorne um conjunto de dados distribuído).
Todas as transformações no Spark são preguiçosas, pois não calculam seus resultados imediatamente. Em vez disso, eles apenas lembram as transformações aplicadas a algum conjunto de dados base (por exemplo, um arquivo). As transformações são computadas apenas quando uma ação exige que um resultado seja retornado ao programa do driver. Esse design permite que o Spark seja executado com mais eficiência - por exemplo, podemos perceber que um conjunto de dados criado pelo mapa será usado em uma redução e retornará apenas o resultado da redução ao driver, em vez do conjunto de dados mapeado maior.
Por padrão, cada RDD transformado pode ser recalculado toda vez que você executa uma ação nele. No entanto, você também pode persistir um RDD na memória usando o método persist (ou cache); nesse caso, o Spark manterá os elementos no cluster para um acesso muito mais rápido na próxima vez que você o consultar. Também há suporte para RDDs persistentes no disco ou replicados em vários nós.
Para mais detalhes, consulte o guia de programação do Spark .
fonte
Abaixo estão as três situações em que você deve armazenar em cache seus RDDs:
fonte
Adicionando outro motivo para adicionar (ou adicionar temporariamente) a
cache
chamada de método.para problemas de memória de depuração
Com o
cache
método, o spark fornecerá informações de depuração relacionadas ao tamanho do RDD. portanto, na interface do usuário integrada do spark, você obterá informações sobre o consumo de memória RDD. e isso se mostrou muito útil no diagnóstico de problemas de memória.fonte