Spark, idealmente dividindo um único RDD em dois

10

Eu tenho um grande conjunto de dados que preciso dividir em grupos de acordo com parâmetros específicos. Quero que o trabalho seja processado da maneira mais eficiente possível. Eu posso imaginar duas maneiras de fazer isso

Opção 1 - Criar mapa a partir do RDD original e filtrar

def customMapper(record):
    if passesSomeTest(record):
        return (1,record)
    else:
        return (0,record)

mappedRdd = rddIn.map(lambda x: customMapper(x))
rdd0 = mappedRdd.filter(lambda x: x[0]==0).cache()
rdd1 = mappedRdd.filter(lambda x: x[1]==1).cache()

Opção 2 - Filtrar o RDD original diretamente

def customFilter(record):
    return passesSomeTest(record)

rdd0 = rddIn.filter(lambda x: customFilter(x)==False).cache()
rdd1 = rddIn.filter(customFilter).cache()

O método fist precisa iterar todos os registros do conjunto de dados original três vezes, onde o segundo apenas o faz duas vezes, em circunstâncias normais, no entanto, a faísca gera alguns gráficos nos bastidores, para que eu possa imaginar que eles são efetivamente feito da mesma maneira. Minhas perguntas são: a.) Um método é mais eficiente que o outro, ou a construção do gráfico de faísca os torna equivalentes? B) É possível fazer essa divisão em uma única passagem?

jagartner
fonte
Eu também me encontrei com um problema muito semelhante e realmente não encontrei uma solução. Mas o que realmente acontece não está claro nesse código, porque o spark possui uma 'avaliação lenta' e é supostamente capaz de executar apenas o que realmente precisa executar, além de combinar mapas, filtros e tudo o que pode ser feito em conjunto. Então, possivelmente, o que você descreve pode acontecer em uma única passagem. No entanto, não estou familiarizado o suficiente com os mecanismos de avaliação preguiçosos. Na verdade, eu apenas notei o .cache (). Talvez haja uma maneira de fazer apenas um .cache () e obter os resultados completos?
User3780968

Respostas:

9

Antes de tudo, deixe-me dizer que não sou especialista em Spark; Eu tenho usado bastante nos últimos meses, e acredito que agora entendo, mas posso estar errado.

Então, respondendo suas perguntas:

a.) eles são equivalentes, mas não da maneira que você está vendo; O Spark não otimizará o gráfico se você estiver se perguntando, mas customMapperele ainda será executado duas vezes nos dois casos; isto é devido ao facto de que a presença de faísca, rdd1e rdd2são dois RDDS completamente diferentes, e que vai construir o gráfico de baixo para cima transformação a partir das folhas; portanto, a opção 1 será traduzida para:

rdd0 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==0).cache()
rdd1 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==1).cache()

Como você disse, customMapperé executado duas vezes (além disso, também rddInserá lido duas vezes, o que significa que, se vier de um banco de dados, pode ser ainda mais lento).

b.) existe uma maneira, você só precisa se mover cache()no lugar certo:

mappedRdd = rddIn.map(lambda x: customMapper(x)).cache()
rdd0 = mappedRdd.filter(lambda x: x[0]==0)
rdd1 = mappedRdd.filter(lambda x: x[0]==1)

Ao fazer isso, estamos dizendo ao Spark que ele pode armazenar os resultados parciais de mappedRdd; ele usará esses resultados parciais para rdd1e rdd2. Do ponto de vista da centelha, isso é equivalente a:

mappedRdd = rddIn.map(lambda x: customMapper(x)).saveAsObjectFile('..')
# forget about everything
rdd0 = sc.objectFile('..').filter(lambda x: x[0]==0)
rdd1 = sc.objectFile('..').filter(lambda x: x[0]==1)
StefanoP
fonte