Eu tenho um quadro de dados com o seguinte código:
def test(lat: Double, lon: Double) = {
println(s"testing ${lat / lon}")
Map("one" -> "one", "two" -> "two")
}
val testUDF = udf(test _)
df.withColumn("test", testUDF(col("lat"), col("lon")))
.withColumn("test1", col("test.one"))
.withColumn("test2", col("test.two"))
Agora, verificando os logs, descobri que para cada linha a UDF é executada 3 vezes. Se eu adicionar o "test3" de uma coluna "test.three", o UDF será executado novamente.
Alguém pode me explicar o porquê?
Isso pode ser evitado corretamente (sem armazenar em cache o quadro de dados após a adição de "teste", mesmo que isso funcione)?
scala
apache-spark
apache-spark-sql
Rolintocour
fonte
fonte
Map
e não um Struct. Agora, em vez de retornar um mapa, se o UDF retornar uma classe de caso como Test (uma String, duas: String), natest
verdade, é uma Struct, mas sempre haverá muitas execuções da UDF.Respostas:
Se você deseja evitar várias chamadas para um udf (o que é útil especialmente se o udf é um gargalo no seu trabalho), você pode fazer o seguinte:
Basicamente, você diz ao Spark que sua função não é determinística e agora o Spark garante que ele seja chamado apenas uma vez porque não é seguro chamá-lo várias vezes (cada chamada pode retornar um resultado diferente).
Lembre-se também de que esse truque não é gratuito. Ao fazer isso, você está colocando algumas restrições no otimizador; um efeito colateral disso é, por exemplo, que o otimizador Spark não envia filtros através de expressões que não são determinísticas, para que você se torne responsável pela otimização ideal. posição dos filtros na sua consulta.
fonte
asNondeterministic
obriga a UDF a executar apenas uma vez. Com aexplode(array(myUdf($"id")))
solução, ele ainda é executado duas vezes.