Eu tenho um grande conjunto de dados que preciso dividir em grupos de acordo com parâmetros específicos. Quero que o trabalho seja processado da maneira mais eficiente possível. Eu posso imaginar duas maneiras de fazer isso
Opção 1 - Criar mapa a partir do RDD original e filtrar
def customMapper(record):
if passesSomeTest(record):
return (1,record)
else:
return (0,record)
mappedRdd = rddIn.map(lambda x: customMapper(x))
rdd0 = mappedRdd.filter(lambda x: x[0]==0).cache()
rdd1 = mappedRdd.filter(lambda x: x[1]==1).cache()
Opção 2 - Filtrar o RDD original diretamente
def customFilter(record):
return passesSomeTest(record)
rdd0 = rddIn.filter(lambda x: customFilter(x)==False).cache()
rdd1 = rddIn.filter(customFilter).cache()
O método fist precisa iterar todos os registros do conjunto de dados original três vezes, onde o segundo apenas o faz duas vezes, em circunstâncias normais, no entanto, a faísca gera alguns gráficos nos bastidores, para que eu possa imaginar que eles são efetivamente feito da mesma maneira. Minhas perguntas são: a.) Um método é mais eficiente que o outro, ou a construção do gráfico de faísca os torna equivalentes? B) É possível fazer essa divisão em uma única passagem?
fonte
Respostas:
Antes de tudo, deixe-me dizer que não sou especialista em Spark; Eu tenho usado bastante nos últimos meses, e acredito que agora entendo, mas posso estar errado.
Então, respondendo suas perguntas:
a.) eles são equivalentes, mas não da maneira que você está vendo; O Spark não otimizará o gráfico se você estiver se perguntando, mas
customMapper
ele ainda será executado duas vezes nos dois casos; isto é devido ao facto de que a presença de faísca,rdd1
erdd2
são dois RDDS completamente diferentes, e que vai construir o gráfico de baixo para cima transformação a partir das folhas; portanto, a opção 1 será traduzida para:Como você disse,
customMapper
é executado duas vezes (além disso, tambémrddIn
será lido duas vezes, o que significa que, se vier de um banco de dados, pode ser ainda mais lento).b.) existe uma maneira, você só precisa se mover
cache()
no lugar certo:Ao fazer isso, estamos dizendo ao Spark que ele pode armazenar os resultados parciais de
mappedRdd
; ele usará esses resultados parciais parardd1
erdd2
. Do ponto de vista da centelha, isso é equivalente a:fonte