Uma grande diferença, não mencionada em nenhuma outra resposta stackoverflow relacionada a este tópico claramente, é que reducedeve ser dado um monóide comutativo , ou seja, uma operação que é tanto comutativa quanto associativa. Isso significa que a operação pode ser paralelizada.
Essa distinção é muito importante para Big Data / MPP / computação distribuída e toda a razão pela qual reduceexiste. A coleção pode ser cortada e a reducelata operar em cada pedaço, então a reducelata operar nos resultados de cada pedaço - na verdade, o nível de pedaço não precisa parar um nível de profundidade. Poderíamos cortar cada pedaço também. É por isso que somar inteiros em uma lista é O (log N) se dado um número infinito de CPUs.
Se você apenas olhar para as assinaturas, não há razão para reduceexistir, porque você pode conseguir tudo o que pode reducecom um foldLeft. A funcionalidade de foldLefté maior do que a funcionalidade de reduce.
Mas você não pode paralelizar a foldLeft, então seu tempo de execução é sempre O (N) (mesmo se você alimentar um monóide comutativo). Isso ocorre porque é assumido que a operação não é um monóide comutativo e, portanto, o valor acumulado será calculado por uma série de agregações sequenciais.
foldLeftnão assume comutatividade nem associatividade. É a associatividade que dá a capacidade de fragmentar a coleção e é a comutatividade que facilita a acumulação porque a ordem não é importante (portanto, não importa a ordem de agregar cada um dos resultados de cada um dos blocos). Estritamente falando, a comutatividade não é necessária para a paralelização, por exemplo, algoritmos de classificação distribuída, ela apenas torna a lógica mais fácil porque você não precisa dar uma ordem aos seus pedaços.
Se você der uma olhada na documentação do Spark reduce, diz especificamente "... operador binário comutativo e associativo"
Aqui está a prova de que reduceNÃO é apenas um caso especial defoldLeft
scala>val intParList:ParSeq[Int]=(1 to 100000).map(_ => scala.util.Random.nextInt()).par
scala> timeMany(1000, intParList.reduce(_ + _))Took462.395867 milli seconds
scala> timeMany(1000, intParList.foldLeft(0)(_ + _))Took2589.363031 milli seconds
reduzir vs dobrar
Agora é aqui que fica um pouco mais perto das raízes matemáticas / FP, e um pouco mais complicado de explicar. Reduzir é definido formalmente como parte do paradigma MapReduce, que lida com coleções sem ordem (multisets), Fold é formalmente definido em termos de recursão (ver catamorfismo) e, portanto, assume uma estrutura / sequência para as coleções.
Não há nenhum foldmétodo em Scalding porque no modelo de programação (estrito) Map Reduce não podemos definir foldporque os chunks não têm uma ordem e foldrequerem apenas associatividade, não comutatividade.
Simplificando, reducefunciona sem uma ordem de acumulação, foldrequer uma ordem de acumulação e é essa ordem de acumulação que necessita de um valor zero, NÃO a existência do valor zero que os distingue. Estritamente falando, reducedeve funcionar em uma coleção vazia, porque seu valor zero pode ser deduzido tomando um valor arbitrário xe depois resolvendo x op y = x, mas isso não funciona com uma operação não comutativa, pois pode haver um valor zero à esquerda e à direita que são distintos (ou seja x op y != y op x). É claro que Scala não se preocupa em descobrir qual é esse valor zero, pois isso exigiria alguns cálculos matemáticos (que provavelmente são incomputáveis), então apenas lança uma exceção.
Parece (como é frequentemente o caso na etimologia) que este significado matemático original se perdeu, uma vez que a única diferença óbvia na programação é a assinatura. O resultado é que reducese tornou sinônimo de fold, em vez de preservar seu significado original do MapReduce. Agora, esses termos são freqüentemente usados de forma intercambiável e se comportam da mesma forma na maioria das implementações (ignorando coleções vazias). A estranheza é exacerbada por peculiaridades, como no Spark, que abordaremos agora.
Assim faísca que têm um fold, mas a ordem pela qual sub resultados (uma para cada partição) são combinadas (no momento de escrita) é da mesma ordem em que as tarefas são concluídas - e, portanto, não-determinística. Agradeço a @CafeFeed por apontar esse folduso runJob, que após ler o código percebi que não é determinístico. Mais confusão é criada pelo Spark ter um treeReducemas não treeFold.
Conclusão
Há uma diferença entre reducee foldmesmo quando aplicado a sequências não vazias. O primeiro é definido como parte do paradigma de programação MapReduce em coleções com ordem arbitrária ( http://theory.stanford.edu/~sergei/papers/soda10-mrc.pdf ) e deve-se assumir que os operadores são comutativos além de serem associativo para fornecer resultados determinísticos. Este último é definido em termos de catomorfismos e requer que as coleções tenham uma noção de sequência (ou sejam definidas recursivamente, como listas encadeadas), portanto, não requerem operadores comutativos.
Na prática, devido à natureza não matemática da programação, reducee foldtendem a se comportar da mesma forma, seja corretamente (como no Scala) ou incorretamente (como no Spark).
Extra: Minha opinião sobre a API Spark
Minha opinião é que a confusão seria evitada se o uso do termo foldfosse completamente abandonado no Spark. Pelo menos o Spark tem uma observação em sua documentação:
Isso se comporta de maneira um pouco diferente das operações de dobra implementadas para coleções não distribuídas em linguagens funcionais como Scala.
É por isso que foldLeftcontém o Leftem seu nome e também há um método chamado fold.
Kiritsuku
1
@Cloudtech Isso é uma coincidência de sua implementação de thread único, não dentro de suas especificações. Na minha máquina de 4 núcleos, se tento adicionar .par, (List(1000000.0) ::: List.tabulate(100)(_ + 0.001)).par.reduce(_ / _)obtenho resultados diferentes a cada vez.
samthebest
2
@AlexDean no contexto da ciência da computação, não, realmente não precisa de uma identidade, já que coleções vazias tendem a apenas lançar exceções. Mas é matematicamente mais elegante (e seria mais elegante se as coleções fizessem isso) se o elemento de identidade fosse retornado quando a coleção estivesse vazia. Em matemática, "lançar uma exceção" não existe.
samthebest
3
@samthebest: Você tem certeza sobre a comutatividade? github.com/apache/spark/blob/… diz "Para funções que não são comutativas, o resultado pode ser diferente de uma dobra aplicada a uma coleção não distribuída."
42 de
1
@ Make42 Correto, reallyFoldporém , alguém poderia escrever seu próprio cafetão, como :, isso rdd.mapPartitions(it => Iterator(it.fold(zero)(f)))).collect().fold(zero)(f)não precisaria f para comutar.
samthebest
10
Se não estou enganado, mesmo que a API do Spark não exija, o fold também exige que f seja comutativo. Porque a ordem em que as partições serão agregadas não é garantida. Por exemplo, no código a seguir, apenas a primeira impressão é classificada:
Depois de algumas idas e vindas, acreditamos que você está correto. A ordem de combinação é o primeiro a chegar, primeiro a servir. Se você executar sc.makeRDD(0 to 9, 2).mapPartitions(it => { java.lang.Thread.sleep(new java.util.Random().nextInt(1000)); it } ).map(_.toString).fold("")(_ + _)com 2+ núcleos várias vezes, acho que verá que ele produz uma ordem aleatória (em termos de partição). Eu atualizei minha resposta de acordo.
samthebest
3
foldno Apache Spark não é o mesmo que foldem coleções não distribuídas. Na verdade , requer função comutativa para produzir resultados determinísticos:
Isso se comporta de maneira um pouco diferente das operações de dobra implementadas para coleções não distribuídas em linguagens funcionais como Scala. Esta operação de dobrar pode ser aplicada a partições individualmente e, em seguida, dobrar esses resultados no resultado final, em vez de aplicar a dobra a cada elemento sequencialmente em alguma ordem definida. Para funções que não são comutativas, o resultado pode ser diferente de uma dobra aplicada a uma coleção não distribuída.
def fold(zeroValue: T)(op:(T, T)=> T): T = withScope {var jobResult: T
val cleanOp:(T, T)=> T
val foldPartition =Iterator[T]=> T
val mergeResult:(Int, T)=>Unit
sc.runJob(this, foldPartition, mergeResult)
jobResult
}
def reduce(f:(T, T)=> T): T = withScope {val cleanF:(T, T)=> T
val reducePartition:Iterator[T]=>Option[T]var jobResult:Option[T]val mergeResult =(Int,Option[T])=>Unit
sc.runJob(this, reducePartition, mergeResult)
jobResult.getOrElse(thrownewUnsupportedOperationException("empty collection"))}
onde runJobé executado sem levar em consideração a ordem de partição e resulta na necessidade de função comutativa.
foldPartitione reducePartitionsão equivalentes em termos de ordem de processamento e efetivamente (por herança e delegação) implementados por reduceLefte foldLeftem TraversableOnce.
Conclusão: foldno RDD não pode depender da ordem dos chunks e precisa de comutatividade e associatividade .
Tenho que admitir que a etimologia é confusa e a literatura de programação carece de definições formais. Acho que é seguro dizer que foldon RDDs é realmente igual a reduce, mas isso não respeita as diferenças matemáticas básicas (atualizei minha resposta para ficar ainda mais claro). Embora eu discorde que realmente precisamos de comutatividade, desde que se tenha certeza de tudo o que seu parceiro está fazendo, ela está preservando a ordem.
samthebest
A ordem de dobra indefinida não está relacionada ao particionamento. É uma consequência direta de uma implementação runJob.
AH! Desculpe, não consegui descobrir qual era o seu ponto, mas depois de ler o runJobcódigo, vejo que de fato ele faz a combinação de acordo com quando uma tarefa é concluída, NÃO a ordem das partições. É esse detalhe importante que faz com que tudo se encaixe. Editei minha resposta novamente e, assim, corrigi o erro que você apontou. Por favor, você poderia remover sua recompensa, já que agora estamos de acordo?
samthebest
Não posso editar ou remover - essa opção não existe. Posso premiar, mas acho que você consegue alguns pontos apenas com a atenção, estou errado? Se você confirmar que quer uma recompensa, eu o faço nas próximas 24 horas. Obrigado pelas correções e desculpe por um método, mas parece que você ignora todos os avisos, é uma grande coisa, e a resposta foi citada em todos os lugares.
1
Que tal você conceder a @Mishael Rosenthal, já que ele foi o primeiro a expressar claramente a preocupação. Não tenho interesse nos pontos, só gosto de usar o SO para SEO e organização.
samthebest
2
Uma outra diferença para Scalding é o uso de combinadores no Hadoop.
Imagine que sua operação seja monóide comutativa, com a redução ela será aplicada no lado do mapa também, em vez de embaralhar / classificar todos os dados para redutores. Com foldLeft não é esse o caso.
pipe.groupBy('product){
_.reduce('price->'total){(sum:Double, price:Double)=> sum + price }// reduce is .mapReduceMap in disguise}
pipe.groupBy('product){
_.foldLeft('price->'total)(0.0){(sum:Double, price:Double)=> sum + price }}
É sempre uma boa prática definir suas operações como monóide no Scalding.
Respostas:
reduzir vs foldLeft
Uma grande diferença, não mencionada em nenhuma outra resposta stackoverflow relacionada a este tópico claramente, é que
reduce
deve ser dado um monóide comutativo , ou seja, uma operação que é tanto comutativa quanto associativa. Isso significa que a operação pode ser paralelizada.Essa distinção é muito importante para Big Data / MPP / computação distribuída e toda a razão pela qual
reduce
existe. A coleção pode ser cortada e areduce
lata operar em cada pedaço, então areduce
lata operar nos resultados de cada pedaço - na verdade, o nível de pedaço não precisa parar um nível de profundidade. Poderíamos cortar cada pedaço também. É por isso que somar inteiros em uma lista é O (log N) se dado um número infinito de CPUs.Se você apenas olhar para as assinaturas, não há razão para
reduce
existir, porque você pode conseguir tudo o que podereduce
com umfoldLeft
. A funcionalidade defoldLeft
é maior do que a funcionalidade dereduce
.Mas você não pode paralelizar a
foldLeft
, então seu tempo de execução é sempre O (N) (mesmo se você alimentar um monóide comutativo). Isso ocorre porque é assumido que a operação não é um monóide comutativo e, portanto, o valor acumulado será calculado por uma série de agregações sequenciais.foldLeft
não assume comutatividade nem associatividade. É a associatividade que dá a capacidade de fragmentar a coleção e é a comutatividade que facilita a acumulação porque a ordem não é importante (portanto, não importa a ordem de agregar cada um dos resultados de cada um dos blocos). Estritamente falando, a comutatividade não é necessária para a paralelização, por exemplo, algoritmos de classificação distribuída, ela apenas torna a lógica mais fácil porque você não precisa dar uma ordem aos seus pedaços.Se você der uma olhada na documentação do Spark
reduce
, diz especificamente "... operador binário comutativo e associativo"http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.rdd.RDD
Aqui está a prova de que
reduce
NÃO é apenas um caso especial defoldLeft
reduzir vs dobrar
Agora é aqui que fica um pouco mais perto das raízes matemáticas / FP, e um pouco mais complicado de explicar. Reduzir é definido formalmente como parte do paradigma MapReduce, que lida com coleções sem ordem (multisets), Fold é formalmente definido em termos de recursão (ver catamorfismo) e, portanto, assume uma estrutura / sequência para as coleções.
Não há nenhum
fold
método em Scalding porque no modelo de programação (estrito) Map Reduce não podemos definirfold
porque os chunks não têm uma ordem efold
requerem apenas associatividade, não comutatividade.Simplificando,
reduce
funciona sem uma ordem de acumulação,fold
requer uma ordem de acumulação e é essa ordem de acumulação que necessita de um valor zero, NÃO a existência do valor zero que os distingue. Estritamente falando,reduce
deve funcionar em uma coleção vazia, porque seu valor zero pode ser deduzido tomando um valor arbitráriox
e depois resolvendox op y = x
, mas isso não funciona com uma operação não comutativa, pois pode haver um valor zero à esquerda e à direita que são distintos (ou sejax op y != y op x
). É claro que Scala não se preocupa em descobrir qual é esse valor zero, pois isso exigiria alguns cálculos matemáticos (que provavelmente são incomputáveis), então apenas lança uma exceção.Parece (como é frequentemente o caso na etimologia) que este significado matemático original se perdeu, uma vez que a única diferença óbvia na programação é a assinatura. O resultado é que
reduce
se tornou sinônimo defold
, em vez de preservar seu significado original do MapReduce. Agora, esses termos são freqüentemente usados de forma intercambiável e se comportam da mesma forma na maioria das implementações (ignorando coleções vazias). A estranheza é exacerbada por peculiaridades, como no Spark, que abordaremos agora.Assim faísca que têm um
fold
, mas a ordem pela qual sub resultados (uma para cada partição) são combinadas (no momento de escrita) é da mesma ordem em que as tarefas são concluídas - e, portanto, não-determinística. Agradeço a @CafeFeed por apontar essefold
usorunJob
, que após ler o código percebi que não é determinístico. Mais confusão é criada pelo Spark ter umtreeReduce
mas nãotreeFold
.Conclusão
Há uma diferença entre
reduce
efold
mesmo quando aplicado a sequências não vazias. O primeiro é definido como parte do paradigma de programação MapReduce em coleções com ordem arbitrária ( http://theory.stanford.edu/~sergei/papers/soda10-mrc.pdf ) e deve-se assumir que os operadores são comutativos além de serem associativo para fornecer resultados determinísticos. Este último é definido em termos de catomorfismos e requer que as coleções tenham uma noção de sequência (ou sejam definidas recursivamente, como listas encadeadas), portanto, não requerem operadores comutativos.Na prática, devido à natureza não matemática da programação,
reduce
efold
tendem a se comportar da mesma forma, seja corretamente (como no Scala) ou incorretamente (como no Spark).Extra: Minha opinião sobre a API Spark
Minha opinião é que a confusão seria evitada se o uso do termo
fold
fosse completamente abandonado no Spark. Pelo menos o Spark tem uma observação em sua documentação:fonte
foldLeft
contém oLeft
em seu nome e também há um método chamadofold
..par
,(List(1000000.0) ::: List.tabulate(100)(_ + 0.001)).par.reduce(_ / _)
obtenho resultados diferentes a cada vez.reallyFold
porém , alguém poderia escrever seu próprio cafetão, como :, issordd.mapPartitions(it => Iterator(it.fold(zero)(f)))).collect().fold(zero)(f)
não precisaria f para comutar.Se não estou enganado, mesmo que a API do Spark não exija, o fold também exige que f seja comutativo. Porque a ordem em que as partições serão agregadas não é garantida. Por exemplo, no código a seguir, apenas a primeira impressão é classificada:
Imprimir:
a B C D e F G H I J K L M N o p q R S T U V W x y Z
abcghituvjklmwxyzqrsdefnop
defghinopjklmqrstuvabcwxyz
fonte
sc.makeRDD(0 to 9, 2).mapPartitions(it => { java.lang.Thread.sleep(new java.util.Random().nextInt(1000)); it } ).map(_.toString).fold("")(_ + _)
com 2+ núcleos várias vezes, acho que verá que ele produz uma ordem aleatória (em termos de partição). Eu atualizei minha resposta de acordo.fold
no Apache Spark não é o mesmo quefold
em coleções não distribuídas. Na verdade , requer função comutativa para produzir resultados determinísticos:Isso foi mostrado por Mishael Rosenthal e sugerido por Make42 em seu comentário .
Foi sugerido que o comportamento observado está relacionado a
HashPartitioner
quando, na verdadeparallelize
, não embaralha e não usaHashPartitioner
.Explicado:
Estrutura de
fold
para RDDé o mesmo que a estrutura de
reduce
para RDD:onde
runJob
é executado sem levar em consideração a ordem de partição e resulta na necessidade de função comutativa.foldPartition
ereducePartition
são equivalentes em termos de ordem de processamento e efetivamente (por herança e delegação) implementados porreduceLeft
efoldLeft
emTraversableOnce
.Conclusão:
fold
no RDD não pode depender da ordem dos chunks e precisa de comutatividade e associatividade .fonte
fold
onRDD
s é realmente igual areduce
, mas isso não respeita as diferenças matemáticas básicas (atualizei minha resposta para ficar ainda mais claro). Embora eu discorde que realmente precisamos de comutatividade, desde que se tenha certeza de tudo o que seu parceiro está fazendo, ela está preservando a ordem.runJob
código, vejo que de fato ele faz a combinação de acordo com quando uma tarefa é concluída, NÃO a ordem das partições. É esse detalhe importante que faz com que tudo se encaixe. Editei minha resposta novamente e, assim, corrigi o erro que você apontou. Por favor, você poderia remover sua recompensa, já que agora estamos de acordo?Uma outra diferença para Scalding é o uso de combinadores no Hadoop.
Imagine que sua operação seja monóide comutativa, com a redução ela será aplicada no lado do mapa também, em vez de embaralhar / classificar todos os dados para redutores. Com foldLeft não é esse o caso.
É sempre uma boa prática definir suas operações como monóide no Scalding.
fonte