Apache Spark: map vs mapPartitions?

133

Qual a diferença entre um RDD map e um mapPartitionsmétodo? E se flatMapcomporta como mapou como mapPartitions? Obrigado.

(editar) ie qual é a diferença (semanticamente ou em termos de execução) entre

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
      preservesPartitioning = true)
  }

E:

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.map(fn)
  }
Nicholas White
fonte
3
Depois de ler a resposta abaixo, você pode conferir [essa experiência] compartilhada por alguém que realmente a usou. ( Bzhangusc.wordpress.com/2014/06/19/… ) bzhangusc.wordpress.com/2014/06/19 / ...
Abhidemon

Respostas:

121

Qual é a diferença entre o mapa de um RDD e o método mapPartitions?

O mapa do método converte cada elemento do RDD de origem em um único elemento do RDD resultante, aplicando uma função. mapPartitions converte cada partição do RDD de origem em vários elementos do resultado (possivelmente nenhum).

E o flatMap se comporta como map ou como mapPartitions?

O flatMap também não funciona em um único elemento (as map) e produz vários elementos do resultado (as mapPartitions).

Alexey Romanov
fonte
3
Obrigado - o mapa causa shuffles (ou altera o número de partições)? Move dados entre nós? Eu tenho usado o mapPartitions para evitar a movimentação de dados entre nós, mas não tinha certeza se o flapMap faria isso.
Nicholas White
Se você olhar para a fonte - github.com/apache/incubator-spark/blob/… e github.com/apache/incubator-spark/blob/… - ambos mape flatMaptêm exatamente as mesmas partições que o pai.
Alexey Romanov
13
Como observação, uma apresentação fornecida por um palestrante no San Francisco Spark Summit 2013 (goo.gl/JZXDCR) destaca que as tarefas com alto custo por registro apresentam melhor desempenho com uma mapPartition do que com uma transformação de mapa. Isso é, de acordo com a apresentação, devido ao alto custo de configurar uma nova tarefa.
Mikel Urkia
1
Estou vendo o contrário - mesmo com operações muito pequenas, é mais rápido chamar mapPartitions e iterar do que chamar map. Estou assumindo que esta é apenas a sobrecarga de iniciar o mecanismo de linguagem que processará a tarefa de mapeamento. (Estou no R, que pode ter mais sobrecarga de inicialização.) Se você executasse várias operações, o mapPartitions parece ser um pouco mais rápido - presumo que seja porque ele lê o RDD apenas uma vez. Mesmo se o RDD estiver armazenado em cache na RAM, isso economiza muita sobrecarga na conversão de tipo.
27415 Bob
3
mapbasicamente pega sua função fe a passa para iter.map(f). Então, basicamente, é um método de conveniência que envolve mapPartitions. Eu ficaria surpreso se houvesse uma vantagem de desempenho de qualquer maneira para um trabalho de transformação de estilo de mapa puro (ou seja, onde a função é idêntica), se você precisar criar alguns objetos para processamento, se esses objetos puderem ser compartilhados, mapPartitionsseria vantajoso.
NightWolf
129

Criança levada. DICA :

Sempre que você tiver uma inicialização pesada que deve ser feita uma vez para muitos RDDelementos, e não uma vez por RDDelemento, e se essa inicialização, como a criação de objetos a partir de uma biblioteca de terceiros, não puder ser serializada (para que o Spark possa transmiti-lo pelo cluster para os nós do trabalhador), use em mapPartitions()vez de map(). mapPartitions()prevê que a inicialização seja feita uma vez por tarefa / thread / partição do trabalhador, em vez de uma vez por RDDelemento de dados, por exemplo: veja abaixo.

val newRd = myRdd.mapPartitions(partition => {
  val connection = new DbConnection /*creates a db connection per partition*/

  val newPartition = partition.map(record => {
    readMatchingFromDB(record, connection)
  }).toList // consumes the iterator, thus calls readMatchingFromDB 

  connection.close() // close dbconnection here
  newPartition.iterator // create a new iterator
})

Q2 se flatMapcomporta como mapa ou comomapPartitions ?

Sim. veja o exemplo 2 deflatmap .. é auto-explicativo.

Q1 Qual é a diferença entre um RDDmap e ummapPartitions

map trabalha a função que está sendo utilizada no nível por elemento enquanto mapPartitions exerce a função no nível da partição.

Cenário de exemplo : se tivermos 100 mil elementos em um determinadoRDD partição , dispararemos a função que está sendo usada pela transformação de mapeamento 100K vezes quando usamos map.

Por outro lado, se usarmos mapPartitions , chamaremos a função específica apenas uma vez, mas passaremos todos os 100 mil registros e receberemos de volta todas as respostas em uma chamada de função.

Haverá ganho de desempenho desde map funciona em uma função específica tantas vezes, especialmente se a função estiver fazendo algo caro a cada vez que não seria necessário se passássemos todos os elementos de uma só vez (no caso de mappartitions).

mapa

Aplica uma função de transformação em cada item do RDD e retorna o resultado como um novo RDD.

Variantes de listagem

mapa de def [U: ClassTag] (f: T => U): RDD [U]

Exemplo:

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
 val b = a.map(_.length)
 val c = a.zip(b)
 c.collect
 res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8)) 

mapPartitions

Este é um mapa especializado chamado apenas uma vez para cada partição. Todo o conteúdo das respectivas partições está disponível como um fluxo sequencial de valores por meio do argumento de entrada (Iterarator [T]). A função personalizada deve retornar outro iterador [U]. Os iteradores de resultados combinados são convertidos automaticamente em um novo RDD. Observe que as tuplas (3,4) e (6,7) estão ausentes no resultado a seguir devido ao particionamento escolhido.

preservesPartitioningindica se a função de entrada preserva o particionador, que deve ser, a falsemenos que seja um par RDD e a função de entrada não modifique as chaves.

Variantes de listagem

def mapPartitions [U: ClassTag] (f: Iterador [T] => Iterador [U], preservesPartitioning: Boolean = false): RDD [U]

Exemplo 1

val a = sc.parallelize(1 to 9, 3)
 def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
   var res = List[(T, T)]()
   var pre = iter.next
   while (iter.hasNext)
   {
     val cur = iter.next;
     res .::= (pre, cur)
     pre = cur;
   }
   res.iterator
 }
 a.mapPartitions(myfunc).collect
 res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) 

Exemplo 2

val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
 def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
   var res = List[Int]()
   while (iter.hasNext) {
     val cur = iter.next;
     res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
   }
   res.iterator
 }
 x.mapPartitions(myfunc).collect
 // some of the number are not outputted at all. This is because the random number generated for it is zero.
 res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10) 

O programa acima também pode ser escrito usando o flatMap da seguinte maneira.

Exemplo 2 usando flatmap

val x  = sc.parallelize(1 to 10, 3)
 x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect

 res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10) 

Conclusão:

mapPartitionsa transformação é mais rápida do que mapuma vez que chama sua função uma vez / partição, não uma vez / elemento.

Leitura adicional: foreach Vs foreachPartitions Quando usar o quê?

Ram Ghadiyaram
fonte
4
Eu sei que você pode usar mapou mapPartitionsobter o mesmo resultado (veja os dois exemplos na pergunta); Esta pergunta é sobre por que você escolheria um caminho sobre o outro. Os comentários na outra resposta são realmente úteis! Além disso, você não mencionou que mape flatMappassar falsepara preservesPartitioning, e quais as implicações disso são.
Nicholas White
2
a função executada todas as vezes versus a função executada uma vez para a parição era o link que estava faltando. Ter acesso a mais de um registro de dados por vez com mapPartition é algo inestimável. agradeço a resposta
Ponto e vírgula e fita adesiva
1
Existe um cenário em que mapé melhor que mapPartitions? Se mapPartitionsé tão bom, por que não é a implementação de mapa padrão?
ruhong 17/03/19
1
@oneleggedmule: ambos são para requisitos diferentes, temos que usar sabiamente se você estiver instanciando recursos, como conexões db (como mostrado no exemplo acima), que são caros, então o mappartitions é a abordagem correta, já que uma conexão por partição. Também saveAsTextFile mappartitions usados internamente ver
Ram Ghadiyaram
@oneleggedmule Do meu ponto de vista, map () é mais fácil de entender e aprender, e também é um método comum de muitos idiomas diferentes. Pode ser mais fácil de usar do que mapPartitions () se alguém não estiver familiarizado com esse método específico do Spark no início. Se não houver diferença de desempenho, prefiro usar map ().
Raymond Chen
15

Mapa :

  1. Ele processa uma linha por vez, muito semelhante ao método map () do MapReduce.
  2. Você retorna da transformação após cada linha.

MapPartitions

  1. Ele processa a partição completa de uma só vez.
  2. Você pode retornar da função apenas uma vez após o processamento de toda a partição.
  3. Todos os resultados intermediários precisam ser mantidos na memória até que você processe toda a partição.
  4. Fornece as funções setup () map () e cleanup () do MapReduce

Map Vs mapPartitions http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/

Spark Map http://bytepadding.com/big-data/spark/spark-map/

Spark mapPartitions http://bytepadding.com/big-data/spark/spark-mappartitions/

KrazyGautam
fonte
em relação a 2 - se você estiver executando transformações de iterador para iterador e não materializando o iterador em uma coleção de algum tipo, não será necessário manter a partição inteira na memória; de fato, dessa forma, o Spark será capaz de derrame partes da partição no disco.
ilcord
4
Você não precisa manter a partição inteira na memória, mas o resultado. Você não pode retornar o resultado até que você tenha processado a partição inteira
KrazyGautam