Apache Spark: impacto do reparticionamento, classificação e armazenamento em cache em uma junção

10

Estou explorando o comportamento do Spark ao ingressar em uma tabela. Eu estou usando Databricks.

Meu cenário fictício é:

  1. Leia uma tabela externa como dataframe A (os arquivos subjacentes estão no formato delta)

  2. Defina o quadro de dados B como o quadro de dados A com apenas determinadas colunas selecionadas

  3. Unir os quadros de dados A e B na coluna1 e coluna2

(Sim, não faz muito sentido, estou apenas tentando entender a mecânica subjacente do Spark)

a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))

b = a.select("column1", "column2", "columnA")

c= a.join(b, how="left", on = ["column1", "column2"])

Minha primeira tentativa foi executar o código como está (tentativa 1). Tentei reparticionar e armazenar em cache (tentativa 2)

a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).cache()

Finalmente, reparticionei, classifiquei e coloquei em cache

 a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).sortWithinPartitions(col("column1"), col("column2")).cache()

Os respectivos dags gerados são os anexados.

Minhas perguntas são:

  1. Por que, na tentativa 1, a tabela parece estar em cache, mesmo que o cache não tenha sido especificado explicitamente.

  2. Por que InMemoreTableScan é sempre seguido por outro nó desse tipo.

  3. Por que na tentativa 3 o cache parece ocorrer em dois estágios?

  4. Por que na tentativa 3 WholeStageCodegen segue um (e apenas um) InMemoreTableScan.

tentativa 1

tentativa 2

insira a descrição da imagem aqui

Dawid
fonte
Eu suspeito que o leitor DataFrame armazena em cache os dados automaticamente quando a fonte é uma tabela externa. Tenho uma situação semelhante em que estou lendo dados de uma tabela de banco de dados, enquanto é possível baixar a guia "SQL" em 'UI de detalhes do aplicativo' mostra o número de linhas sendo baixadas, mas nenhum arquivo foi salvo ainda no local especificado . Eu acho que ele conhece a contagem, porque ele armazena em cache os dados em algum lugar e é isso que aparece no DAG. Se você ler dados de um arquivo de texto localmente, não verá o estado do cache.
Salim

Respostas:

4

O que você está observando nesses três planos é uma mistura de tempo de execução do DataBricks e Spark.

Primeiro, durante a execução do DataBricks 3.3+, o cache é ativado automaticamente para todos os arquivos em parquet. Configuração correspondente para isso: spark.databricks.io.cache.enabled true

Para sua segunda consulta, o InMemoryTableScan está acontecendo duas vezes porque, exatamente quando a associação foi chamada, o spark tentou calcular o Conjunto de Dados A e o Conjunto de Dados B em paralelo. Supondo que diferentes executores tenham recebido as tarefas acima, ambos terão que verificar a tabela a partir do cache (DataBricks).

Para o terceiro, InMemoryTableScan não se refere ao cache em si. Significa apenas que qualquer catalisador de plano formado envolveu a varredura da tabela em cache várias vezes.

PS: Não consigo visualizar o ponto 4 :)

Ashvjit Singh
fonte