Diferença entre reduz e foldLeft / fold na programação funcional (particularmente Scala e Scala APIs)?

Respostas:

260

reduzir vs foldLeft

Uma grande diferença, não mencionada em nenhuma outra resposta stackoverflow relacionada a este tópico claramente, é que reducedeve ser dado um monóide comutativo , ou seja, uma operação que é tanto comutativa quanto associativa. Isso significa que a operação pode ser paralelizada.

Essa distinção é muito importante para Big Data / MPP / computação distribuída e toda a razão pela qual reduceexiste. A coleção pode ser cortada e a reducelata operar em cada pedaço, então a reducelata operar nos resultados de cada pedaço - na verdade, o nível de pedaço não precisa parar um nível de profundidade. Poderíamos cortar cada pedaço também. É por isso que somar inteiros em uma lista é O (log N) se dado um número infinito de CPUs.

Se você apenas olhar para as assinaturas, não há razão para reduceexistir, porque você pode conseguir tudo o que pode reducecom um foldLeft. A funcionalidade de foldLefté maior do que a funcionalidade de reduce.

Mas você não pode paralelizar a foldLeft, então seu tempo de execução é sempre O (N) (mesmo se você alimentar um monóide comutativo). Isso ocorre porque é assumido que a operação não é um monóide comutativo e, portanto, o valor acumulado será calculado por uma série de agregações sequenciais.

foldLeftnão assume comutatividade nem associatividade. É a associatividade que dá a capacidade de fragmentar a coleção e é a comutatividade que facilita a acumulação porque a ordem não é importante (portanto, não importa a ordem de agregar cada um dos resultados de cada um dos blocos). Estritamente falando, a comutatividade não é necessária para a paralelização, por exemplo, algoritmos de classificação distribuída, ela apenas torna a lógica mais fácil porque você não precisa dar uma ordem aos seus pedaços.

Se você der uma olhada na documentação do Spark reduce, diz especificamente "... operador binário comutativo e associativo"

http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.rdd.RDD

Aqui está a prova de que reduceNÃO é apenas um caso especial defoldLeft

scala> val intParList: ParSeq[Int] = (1 to 100000).map(_ => scala.util.Random.nextInt()).par

scala> timeMany(1000, intParList.reduce(_ + _))
Took 462.395867 milli seconds

scala> timeMany(1000, intParList.foldLeft(0)(_ + _))
Took 2589.363031 milli seconds

reduzir vs dobrar

Agora é aqui que fica um pouco mais perto das raízes matemáticas / FP, e um pouco mais complicado de explicar. Reduzir é definido formalmente como parte do paradigma MapReduce, que lida com coleções sem ordem (multisets), Fold é formalmente definido em termos de recursão (ver catamorfismo) e, portanto, assume uma estrutura / sequência para as coleções.

Não há nenhum foldmétodo em Scalding porque no modelo de programação (estrito) Map Reduce não podemos definir foldporque os chunks não têm uma ordem e foldrequerem apenas associatividade, não comutatividade.

Simplificando, reducefunciona sem uma ordem de acumulação, foldrequer uma ordem de acumulação e é essa ordem de acumulação que necessita de um valor zero, NÃO a existência do valor zero que os distingue. Estritamente falando, reduce deve funcionar em uma coleção vazia, porque seu valor zero pode ser deduzido tomando um valor arbitrário xe depois resolvendo x op y = x, mas isso não funciona com uma operação não comutativa, pois pode haver um valor zero à esquerda e à direita que são distintos (ou seja x op y != y op x). É claro que Scala não se preocupa em descobrir qual é esse valor zero, pois isso exigiria alguns cálculos matemáticos (que provavelmente são incomputáveis), então apenas lança uma exceção.

Parece (como é frequentemente o caso na etimologia) que este significado matemático original se perdeu, uma vez que a única diferença óbvia na programação é a assinatura. O resultado é que reducese tornou sinônimo de fold, em vez de preservar seu significado original do MapReduce. Agora, esses termos são freqüentemente usados ​​de forma intercambiável e se comportam da mesma forma na maioria das implementações (ignorando coleções vazias). A estranheza é exacerbada por peculiaridades, como no Spark, que abordaremos agora.

Assim faísca que têm um fold, mas a ordem pela qual sub resultados (uma para cada partição) são combinadas (no momento de escrita) é da mesma ordem em que as tarefas são concluídas - e, portanto, não-determinística. Agradeço a @CafeFeed por apontar esse folduso runJob, que após ler o código percebi que não é determinístico. Mais confusão é criada pelo Spark ter um treeReducemas não treeFold.

Conclusão

Há uma diferença entre reducee foldmesmo quando aplicado a sequências não vazias. O primeiro é definido como parte do paradigma de programação MapReduce em coleções com ordem arbitrária ( http://theory.stanford.edu/~sergei/papers/soda10-mrc.pdf ) e deve-se assumir que os operadores são comutativos além de serem associativo para fornecer resultados determinísticos. Este último é definido em termos de catomorfismos e requer que as coleções tenham uma noção de sequência (ou sejam definidas recursivamente, como listas encadeadas), portanto, não requerem operadores comutativos.

Na prática, devido à natureza não matemática da programação, reducee foldtendem a se comportar da mesma forma, seja corretamente (como no Scala) ou incorretamente (como no Spark).

Extra: Minha opinião sobre a API Spark

Minha opinião é que a confusão seria evitada se o uso do termo foldfosse completamente abandonado no Spark. Pelo menos o Spark tem uma observação em sua documentação:

Isso se comporta de maneira um pouco diferente das operações de dobra implementadas para coleções não distribuídas em linguagens funcionais como Scala.

o melhor
fonte
2
É por isso que foldLeftcontém o Leftem seu nome e também há um método chamado fold.
Kiritsuku
1
@Cloudtech Isso é uma coincidência de sua implementação de thread único, não dentro de suas especificações. Na minha máquina de 4 núcleos, se tento adicionar .par, (List(1000000.0) ::: List.tabulate(100)(_ + 0.001)).par.reduce(_ / _)obtenho resultados diferentes a cada vez.
samthebest
2
@AlexDean no contexto da ciência da computação, não, realmente não precisa de uma identidade, já que coleções vazias tendem a apenas lançar exceções. Mas é matematicamente mais elegante (e seria mais elegante se as coleções fizessem isso) se o elemento de identidade fosse retornado quando a coleção estivesse vazia. Em matemática, "lançar uma exceção" não existe.
samthebest
3
@samthebest: Você tem certeza sobre a comutatividade? github.com/apache/spark/blob/… diz "Para funções que não são comutativas, o resultado pode ser diferente de uma dobra aplicada a uma coleção não distribuída."
42 de
1
@ Make42 Correto, reallyFoldporém , alguém poderia escrever seu próprio cafetão, como :, isso rdd.mapPartitions(it => Iterator(it.fold(zero)(f)))).collect().fold(zero)(f)não precisaria f para comutar.
samthebest
10

Se não estou enganado, mesmo que a API do Spark não exija, o fold também exige que f seja comutativo. Porque a ordem em que as partições serão agregadas não é garantida. Por exemplo, no código a seguir, apenas a primeira impressão é classificada:

import org.apache.spark.{SparkConf, SparkContext}

object FoldExample extends App{

  val conf = new SparkConf()
    .setMaster("local[*]")
    .setAppName("Simple Application")
  implicit val sc = new SparkContext(conf)

  val range = ('a' to 'z').map(_.toString)
  val rdd = sc.parallelize(range)

  println(range.reduce(_ + _))
  println(rdd.reduce(_ + _))
  println(rdd.fold("")(_ + _))
}  

Imprimir:

a B C D e F G H I J K L M N o p q R S T U V W x y Z

abcghituvjklmwxyzqrsdefnop

defghinopjklmqrstuvabcwxyz

Mishael Rosenthal
fonte
Depois de algumas idas e vindas, acreditamos que você está correto. A ordem de combinação é o primeiro a chegar, primeiro a servir. Se você executar sc.makeRDD(0 to 9, 2).mapPartitions(it => { java.lang.Thread.sleep(new java.util.Random().nextInt(1000)); it } ).map(_.toString).fold("")(_ + _)com 2+ núcleos várias vezes, acho que verá que ele produz uma ordem aleatória (em termos de partição). Eu atualizei minha resposta de acordo.
samthebest
3

foldno Apache Spark não é o mesmo que foldem coleções não distribuídas. Na verdade , requer função comutativa para produzir resultados determinísticos:

Isso se comporta de maneira um pouco diferente das operações de dobra implementadas para coleções não distribuídas em linguagens funcionais como Scala. Esta operação de dobrar pode ser aplicada a partições individualmente e, em seguida, dobrar esses resultados no resultado final, em vez de aplicar a dobra a cada elemento sequencialmente em alguma ordem definida. Para funções que não são comutativas, o resultado pode ser diferente de uma dobra aplicada a uma coleção não distribuída.

Isso foi mostrado por Mishael Rosenthal e sugerido por Make42 em seu comentário .

Foi sugerido que o comportamento observado está relacionado a HashPartitionerquando, na verdade parallelize, não embaralha e não usa HashPartitioner.

import org.apache.spark.sql.SparkSession

/* Note: standalone (non-local) mode */
val master = "spark://...:7077"  

val spark = SparkSession.builder.master(master).getOrCreate()

/* Note: deterministic order */
val rdd = sc.parallelize(Seq("a", "b", "c", "d"), 4).sortBy(identity[String])
require(rdd.collect.sliding(2).forall { case Array(x, y) => x < y })

/* Note: all posible permutations */
require(Seq.fill(1000)(rdd.fold("")(_ + _)).toSet.size == 24)

Explicado:

Estrutura defold para RDD

def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
  var jobResult: T
  val cleanOp: (T, T) => T
  val foldPartition = Iterator[T] => T
  val mergeResult: (Int, T) => Unit
  sc.runJob(this, foldPartition, mergeResult)
  jobResult
}

é o mesmo que a estrutura dereduce para RDD:

def reduce(f: (T, T) => T): T = withScope {
  val cleanF: (T, T) => T
  val reducePartition: Iterator[T] => Option[T]
  var jobResult: Option[T]
  val mergeResult =  (Int, Option[T]) => Unit
  sc.runJob(this, reducePartition, mergeResult)
  jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

onde runJobé executado sem levar em consideração a ordem de partição e resulta na necessidade de função comutativa.

foldPartitione reducePartitionsão equivalentes em termos de ordem de processamento e efetivamente (por herança e delegação) implementados por reduceLefte foldLeftem TraversableOnce.

Conclusão: foldno RDD não pode depender da ordem dos chunks e precisa de comutatividade e associatividade .

user6022341
fonte
Tenho que admitir que a etimologia é confusa e a literatura de programação carece de definições formais. Acho que é seguro dizer que foldon RDDs é realmente igual a reduce, mas isso não respeita as diferenças matemáticas básicas (atualizei minha resposta para ficar ainda mais claro). Embora eu discorde que realmente precisamos de comutatividade, desde que se tenha certeza de tudo o que seu parceiro está fazendo, ela está preservando a ordem.
samthebest
A ordem de dobra indefinida não está relacionada ao particionamento. É uma consequência direta de uma implementação runJob.
AH! Desculpe, não consegui descobrir qual era o seu ponto, mas depois de ler o runJobcódigo, vejo que de fato ele faz a combinação de acordo com quando uma tarefa é concluída, NÃO a ordem das partições. É esse detalhe importante que faz com que tudo se encaixe. Editei minha resposta novamente e, assim, corrigi o erro que você apontou. Por favor, você poderia remover sua recompensa, já que agora estamos de acordo?
samthebest
Não posso editar ou remover - essa opção não existe. Posso premiar, mas acho que você consegue alguns pontos apenas com a atenção, estou errado? Se você confirmar que quer uma recompensa, eu o faço nas próximas 24 horas. Obrigado pelas correções e desculpe por um método, mas parece que você ignora todos os avisos, é uma grande coisa, e a resposta foi citada em todos os lugares.
1
Que tal você conceder a @Mishael Rosenthal, já que ele foi o primeiro a expressar claramente a preocupação. Não tenho interesse nos pontos, só gosto de usar o SO para SEO e organização.
samthebest
2

Uma outra diferença para Scalding é o uso de combinadores no Hadoop.

Imagine que sua operação seja monóide comutativa, com a redução ela será aplicada no lado do mapa também, em vez de embaralhar / classificar todos os dados para redutores. Com foldLeft não é esse o caso.

pipe.groupBy('product) {
   _.reduce('price -> 'total){ (sum: Double, price: Double) => sum + price }
   // reduce is .mapReduceMap in disguise
}

pipe.groupBy('product) {
   _.foldLeft('price -> 'total)(0.0){ (sum: Double, price: Double) => sum + price }
}

É sempre uma boa prática definir suas operações como monóide no Scalding.

Morazow
fonte