Estou explorando o comportamento do Spark ao ingressar em uma tabela. Eu estou usando Databricks.
Meu cenário fictício é:
Leia uma tabela externa como dataframe A (os arquivos subjacentes estão no formato delta)
Defina o quadro de dados B como o quadro de dados A com apenas determinadas colunas selecionadas
Unir os quadros de dados A e B na coluna1 e coluna2
(Sim, não faz muito sentido, estou apenas tentando entender a mecânica subjacente do Spark)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
b = a.select("column1", "column2", "columnA")
c= a.join(b, how="left", on = ["column1", "column2"])
Minha primeira tentativa foi executar o código como está (tentativa 1). Tentei reparticionar e armazenar em cache (tentativa 2)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).cache()
Finalmente, reparticionei, classifiquei e coloquei em cache
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).sortWithinPartitions(col("column1"), col("column2")).cache()
Os respectivos dags gerados são os anexados.
Minhas perguntas são:
Por que, na tentativa 1, a tabela parece estar em cache, mesmo que o cache não tenha sido especificado explicitamente.
Por que InMemoreTableScan é sempre seguido por outro nó desse tipo.
Por que na tentativa 3 o cache parece ocorrer em dois estágios?
Por que na tentativa 3 WholeStageCodegen segue um (e apenas um) InMemoreTableScan.
Respostas:
O que você está observando nesses três planos é uma mistura de tempo de execução do DataBricks e Spark.
Primeiro, durante a execução do DataBricks 3.3+, o cache é ativado automaticamente para todos os arquivos em parquet. Configuração correspondente para isso:
spark.databricks.io.cache.enabled true
Para sua segunda consulta, o InMemoryTableScan está acontecendo duas vezes porque, exatamente quando a associação foi chamada, o spark tentou calcular o Conjunto de Dados A e o Conjunto de Dados B em paralelo. Supondo que diferentes executores tenham recebido as tarefas acima, ambos terão que verificar a tabela a partir do cache (DataBricks).
Para o terceiro, InMemoryTableScan não se refere ao cache em si. Significa apenas que qualquer catalisador de plano formado envolveu a varredura da tabela em cache várias vezes.
PS: Não consigo visualizar o ponto 4 :)
fonte