Qual é a diferença entre map e flatMap e um bom caso de uso para cada um?

249

Alguém pode me explicar a diferença entre map e flatMap e qual é um bom caso de uso para cada um?

O que significa "achatar os resultados"? Para que serve?

Eran Witkon
fonte
4
Como você adicionou a tag Spark, presumo que você esteja perguntando sobre RDD.mape RDD.flatMapno Apache Spark . Em geral, as operações de RDD do Spark são modeladas após as operações de coleta correspondentes do Scala. As respostas em stackoverflow.com/q/1059776/590203 , que discutem a distinção entre mape flatMapno Scala, podem ser úteis para você.
Josh Rosen
1
A maioria dos exemplos aqui parece assumir que o flatMap opera apenas na coleta, o que não é o caso.
Boon

Respostas:

195

Aqui está um exemplo da diferença, como uma spark-shellsessão:

Primeiro, alguns dados - duas linhas de texto:

val rdd = sc.parallelize(Seq("Roses are red", "Violets are blue"))  // lines

rdd.collect

    res0: Array[String] = Array("Roses are red", "Violets are blue")

Agora, maptransforma um RDD de comprimento N em outro RDD de comprimento N.

Por exemplo, ele mapeia de duas linhas em dois comprimentos de linha:

rdd.map(_.length).collect

    res1: Array[Int] = Array(13, 16)

Mas flatMap(em termos gerais) transforma um RDD de comprimento N em uma coleção de N coleções e depois as nivela em um único RDD de resultados.

rdd.flatMap(_.split(" ")).collect

    res2: Array[String] = Array("Roses", "are", "red", "Violets", "are", "blue")

Temos várias palavras por linha e várias linhas, mas acabamos com uma única matriz de palavras de saída

Apenas para ilustrar isso, o flatMapping de uma coleção de linhas para uma coleção de palavras se parece com:

["aa bb cc", "", "dd"] => [["aa","bb","cc"],[],["dd"]] => ["aa","bb","cc","dd"]

Os RDDs de entrada e saída serão, portanto, tipicamente de tamanhos diferentes para flatMap.

Se tentássemos usar mapcom nossa splitfunção, teríamos terminado com estruturas aninhadas (um RDD de arrays de palavras, com tipo RDD[Array[String]]) porque precisamos ter exatamente um resultado por entrada:

rdd.map(_.split(" ")).collect

    res3: Array[Array[String]] = Array(
                                     Array(Roses, are, red), 
                                     Array(Violets, are, blue)
                                 )

Finalmente, um caso especial útil é o mapeamento com uma função que pode não retornar uma resposta e, portanto, retorna um Option. Podemos usar flatMappara filtrar os elementos que retornam Nonee extrair os valores daqueles que retornam a Some:

val rdd = sc.parallelize(Seq(1,2,3,4))

def myfn(x: Int): Option[Int] = if (x <= 2) Some(x * 10) else None

rdd.flatMap(myfn).collect

    res3: Array[Int] = Array(10,20)

(observando aqui que uma opção se comporta como uma lista que possui um elemento ou zero elementos)

DNA
fonte
1
A chamada dividida no mapa daria ["a b c", "", "d"] => [["a","b","c"],[],["d"]]?
User2635088
1
Sim - (mas nota que a minha notação informal foi apenas pretende indicar uma coleção de algum tipo - no mapeamento fato de splitmais de uma lista de Cordas vai produzir uma lista de matrizes)
DNA
2
Obrigado por escrever-se, esta é a melhor explicação que eu tenho lido para distinguir a diferença entre o mesmo
Rajiv
97

Geralmente usamos o exemplo de contagem de palavras no hadoop. Vou pegar o mesmo caso de uso e vou usar mape flatMapveremos a diferença em como ele está processando os dados.

Abaixo está o arquivo de dados de amostra.

hadoop is fast
hive is sql on hdfs
spark is superfast
spark is awesome

O arquivo acima será analisado usando mape flatMap.

Usando map

>>> wc = data.map(lambda line:line.split(" "));
>>> wc.collect()
[u'hadoop is fast', u'hive is sql on hdfs', u'spark is superfast', u'spark is awesome']

A entrada possui 4 linhas e o tamanho da saída também é 4, ou seja, N elementos ==> N elementos.

Usando flatMap

>>> fm = data.flatMap(lambda line:line.split(" "));
>>> fm.collect()
[u'hadoop', u'is', u'fast', u'hive', u'is', u'sql', u'on', u'hdfs', u'spark', u'is', u'superfast', u'spark', u'is', u'awesome']

A saída é diferente do mapa.


Vamos atribuir 1 como valor para cada chave para obter a contagem de palavras.

  • fm: RDD criado usando flatMap
  • wc: RDD criado usando map
>>> fm.map(lambda word : (word,1)).collect()
[(u'hadoop', 1), (u'is', 1), (u'fast', 1), (u'hive', 1), (u'is', 1), (u'sql', 1), (u'on', 1), (u'hdfs', 1), (u'spark', 1), (u'is', 1), (u'superfast', 1), (u'spark', 1), (u'is', 1), (u'awesome', 1)]

Enquanto que flatMapno RDD wcfornecerá a saída indesejada abaixo:

>>> wc.flatMap(lambda word : (word,1)).collect()
[[u'hadoop', u'is', u'fast'], 1, [u'hive', u'is', u'sql', u'on', u'hdfs'], 1, [u'spark', u'is', u'superfast'], 1, [u'spark', u'is', u'awesome'], 1]

Você não pode obter a contagem de palavras se mapfor usada em vez de flatMap.

De acordo com a definição, a diferença entre mape flatMapé:

map: Retorna um novo RDD aplicando a função fornecida a cada elemento do RDD. A função mapretorna apenas um item.

flatMap: Semelhante a map, ele retorna um novo RDD aplicando uma função a cada elemento do RDD, mas a saída é nivelada.

ioga
fonte
14
Eu sinto que esta resposta é melhor que a resposta aceita.
Krishna
15
Por que diabos você criaria capturas de tela ilegíveis, quando poderia copiar e colar o texto de saída?
Nbubis
Então flatMap () é map () + "flatten" e eu sei que não faz muito sentido, mas existe algum tipo de função "flatten" que podemos usar após map ()?
burakongun 01/09/16
2
Seu código tem um erro de digitação enganoso. O resultado de .map(lambda line:line.split(" "))não é uma matriz de seqüências de caracteres. Você deve mudar data.collect() para wc.collecte verá uma matriz de matrizes.
swdev 31/05
1
Sim, mas o resultado do comando ainda está errado. você correu wc.collect()?
swdev
18

Se você está perguntando a diferença entre RDD.map e RDD.flatMap no Spark, o map transforma um RDD do tamanho N em outro do tamanho N. por exemplo.

myRDD.map(x => x*2)

por exemplo, se myRDD for composto de Duplas.

Enquanto o flatMap pode transformar o RDD em outro de tamanho diferente: por exemplo:

myRDD.flatMap(x =>new Seq(2*x,3*x))

que retornará um RDD do tamanho 2 * N ou

myRDD.flatMap(x =>if x<10 new Seq(2*x,3*x) else new Seq(x) )
Oussama
fonte
17

Tudo se resume à sua pergunta inicial: o que você quer dizer com achatamento ?

Quando você usa o flatMap, uma coleção "multidimensional" se torna uma coleção "unidimensional" .

val array1d = Array ("1,2,3", "4,5,6", "7,8,9")  
//array1d is an array of strings

val array2d = array1d.map(x => x.split(","))
//array2d will be : Array( Array(1,2,3), Array(4,5,6), Array(7,8,9) )

val flatArray = array1d.flatMap(x => x.split(","))
//flatArray will be : Array (1,2,3,4,5,6,7,8,9)

Você deseja usar um flatMap quando,

  • sua função de mapa resulta na criação de estruturas em várias camadas
  • mas tudo o que você deseja é uma estrutura unidimensional simples e plana, removendo TODOS os agrupamentos internos
ramu
fonte
15

Use test.mdcomo exemplo:

➜  spark-1.6.1 cat test.md
This is the first line;
This is the second line;
This is the last line.

scala> val textFile = sc.textFile("test.md")
scala> textFile.map(line => line.split(" ")).count()
res2: Long = 3

scala> textFile.flatMap(line => line.split(" ")).count()
res3: Long = 15

scala> textFile.map(line => line.split(" ")).collect()
res0: Array[Array[String]] = Array(Array(This, is, the, first, line;), Array(This, is, the, second, line;), Array(This, is, the, last, line.))

scala> textFile.flatMap(line => line.split(" ")).collect()
res1: Array[String] = Array(This, is, the, first, line;, This, is, the, second, line;, This, is, the, last, line.)

Se você usar o mapmétodo, obterá as linhas de test.md, para o flatMapmétodo, obterá o número de palavras.

O mapmétodo é semelhante flatMap, todos eles retornam um novo RDD. mapmétodo frequentemente usado para retornar um novo RDD, flatMapmétodo freqüentemente usado para dividir palavras.

pangpang
fonte
9

mapretorna RDD de igual número de elementos, enquanto flatMapnão pode.

Um exemplo de caso de uso paraflatMap Filtrar dados ausentes ou incorretos.

Um exemplo de caso de uso paramap Uso em uma ampla variedade de casos em que o número de elementos de entrada e saída é o mesmo.

number.csv

1
2
3
-
4
-
5

map.py adiciona todos os números em add.csv.

from operator import *

def f(row):
  try:
    return float(row)
  except Exception:
    return 0

rdd = sc.textFile('a.csv').map(f)

print(rdd.count())      # 7
print(rdd.reduce(add))  # 15.0

O flatMap.py usa flatMappara filtrar os dados ausentes antes da adição. Menos números são adicionados em comparação com a versão anterior.

from operator import *

def f(row):
  try:
    return [float(row)]
  except Exception:
    return []

rdd = sc.textFile('a.csv').flatMap(f)

print(rdd.count())      # 5
print(rdd.reduce(add))  # 15.0
wannik
fonte
8

map e flatMap são semelhantes, no sentido em que pegam uma linha do RDD de entrada e aplicam uma função nele. A diferença deles é que a função no mapa retorna apenas um elemento, enquanto a função no flatMap pode retornar uma lista de elementos (0 ou mais) como um iterador.

Além disso, a saída do flatMap é achatada. Embora a função no flatMap retorne uma lista de elementos, o flatMap retorna um RDD que possui todos os elementos da lista de maneira simples (não uma lista).

Bhasker
fonte
7

todos os exemplos são bons ... Aqui está uma boa ilustração visual ... cortesia de fonte: treinamento DataFlair de faísca

Mapa: um mapa é uma operação de transformação no Apache Spark. Aplica-se a cada elemento do RDD e retorna o resultado como novo RDD. No mapa, o desenvolvedor da operação pode definir sua própria lógica comercial personalizada. A mesma lógica será aplicada a todos os elementos do RDD.

A mapfunção Spark RDD utiliza um elemento como processo de entrada, de acordo com o código personalizado (especificado pelo desenvolvedor) e retorna um elemento por vez. O mapa transforma um RDD de comprimento N em outro RDD de comprimento N. Os RDDs de entrada e saída normalmente terão o mesmo número de registros.

insira a descrição da imagem aqui

Exemplo de mapuso do scala:

val x = spark.sparkContext.parallelize(List("spark", "map", "example",  "sample", "example"), 3)
val y = x.map(x => (x, 1))
y.collect
// res0: Array[(String, Int)] = 
//    Array((spark,1), (map,1), (example,1), (sample,1), (example,1))

// rdd y can be re writen with shorter syntax in scala as 
val y = x.map((_, 1))
y.collect
// res1: Array[(String, Int)] = 
//    Array((spark,1), (map,1), (example,1), (sample,1), (example,1))

// Another example of making tuple with string and it's length
val y = x.map(x => (x, x.length))
y.collect
// res3: Array[(String, Int)] = 
//    Array((spark,5), (map,3), (example,7), (sample,6), (example,7))

FlatMap:

A flatMapé uma operação de transformação. Aplica-se a cada elemento do RDD e retorna o resultado como novo RDD. É semelhante ao Map, mas o FlatMap permite retornar 0, 1 ou mais elementos da função map. Na operação do FlatMap, um desenvolvedor pode definir sua própria lógica comercial personalizada. A mesma lógica será aplicada a todos os elementos do RDD.

O que significa "achatar os resultados"?

Uma função FlatMap pega um elemento como processo de entrada, de acordo com o código personalizado (especificado pelo desenvolvedor) e retorna 0 ou mais elementos por vez. flatMap() transforma um RDD de comprimento N em outro RDD de comprimento M.

insira a descrição da imagem aqui

Exemplo de flatMapuso do scala:

val x = spark.sparkContext.parallelize(List("spark flatmap example",  "sample example"), 2)

// map operation will return Array of Arrays in following case : check type of res0
val y = x.map(x => x.split(" ")) // split(" ") returns an array of words
y.collect
// res0: Array[Array[String]] = 
//  Array(Array(spark, flatmap, example), Array(sample, example))

// flatMap operation will return Array of words in following case : Check type of res1
val y = x.flatMap(x => x.split(" "))
y.collect
//res1: Array[String] = 
//  Array(spark, flatmap, example, sample, example)

// RDD y can be re written with shorter syntax in scala as 
val y = x.flatMap(_.split(" "))
y.collect
//res2: Array[String] = 
//  Array(spark, flatmap, example, sample, example)
Ram Ghadiyaram
fonte
5

A diferença pode ser vista abaixo do código pyspark de exemplo:

rdd = sc.parallelize([2, 3, 4])
rdd.flatMap(lambda x: range(1, x)).collect()
Output:
[1, 1, 2, 1, 2, 3]


rdd.map(lambda x: range(1, x)).collect()
Output:
[[1], [1, 2], [1, 2, 3]]
awadhesh pathak
fonte
3

O Flatmap e o Map transformam a coleção.

Diferença:

map (func)
Retorna um novo conjunto de dados distribuído formado passando cada elemento da fonte por uma função func.

flatMap (func)
Semelhante ao mapa, mas cada item de entrada pode ser mapeado para 0 ou mais itens de saída (portanto, func deve retornar um Seq em vez de um único item).

A função de transformação:
map : Um elemento dentro -> um elemento fora.
flatMap : Um elemento dentro -> 0 ou mais elementos fora (uma coleção).

Ajit K'sagar
fonte
3

RDD.map retorna todos os elementos em uma única matriz

RDD.flatMap retorna elementos em matrizes da matriz

vamos supor que temos texto no arquivo text.txt como

Spark is an expressive framework
This text is to understand map and faltMap functions of Spark RDD

Usando mapa

val text=sc.textFile("text.txt").map(_.split(" ")).collect

resultado:

text: **Array[Array[String]]** = Array(Array(Spark, is, an, expressive, framework), Array(This, text, is, to, understand, map, and, faltMap, functions, of, Spark, RDD))

Usando flatMap

val text=sc.textFile("text.txt").flatMap(_.split(" ")).collect

resultado:

 text: **Array[String]** = Array(Spark, is, an, expressive, framework, This, text, is, to, understand, map, and, faltMap, functions, of, Spark, RDD)
veera
fonte
2

Para todos aqueles que queriam o PySpark relacionado:

Transformação de exemplo: flatMap

>>> a="hello what are you doing"
>>> a.split()

['Olá O que você está fazendo']

>>> b=["hello what are you doing","this is rak"]
>>> b.split()

Traceback (última chamada mais recente): o arquivo "", linha 1, no AttributeError: o objeto 'list' não tem atributo 'split'

>>> rline=sc.parallelize(b)
>>> type(rline)

>>> def fwords(x):
...     return x.split()


>>> rword=rline.map(fwords)
>>> rword.collect()

[['olá', 'o que', 'são', 'você', 'fazendo'], ['este', 'é', 'rak']]

>>> rwordflat=rline.flatMap(fwords)
>>> rwordflat.collect()

['olá', 'o que', 'são', 'você', 'fazendo', 'isso', 'é', 'rak']

Espero que ajude :)

Rakshith N Gowda
fonte
2

map: Retorna um novo RDDaplicando uma função a cada elemento do RDD. A função .map pode retornar apenas um item.

flatMap: Semelhante para mapear, ele retorna uma nova RDDpela aplicação de uma função a cada elemento do RDD, mas a saída é achatada.

Além disso, a função in flatMappode retornar uma lista de elementos (0 ou mais)

Por exemplo:

sc.parallelize([3,4,5]).map(lambda x: range(1,x)).collect()

Saída: [[1, 2], [1, 2, 3], [1, 2, 3, 4]]

sc.parallelize([3,4,5]).flatMap(lambda x: range(1,x)).collect()

Saída: aviso o / p é achatado em uma única lista [1, 2, 1, 2, 3, 1, 2, 3, 4]

Fonte: https://www.linkedin.com/pulse/difference-between-map-flatmap-transformations-spark-pyspark-pandey/

Pushkar Deshpande
fonte
-1

Diferença na saída do mapa e do flatMap:

1flatMap

val a = sc.parallelize(1 to 10, 5)

a.flatMap(1 to _).collect()

Resultado:

 1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10

2 map.:

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)

val b = a.map(_.length).collect()

Resultado:

3 6 6 3 8
Ashutosh S
fonte
-1
  • map (func) Retorna um novo conjunto de dados distribuído formado passando cada elemento da fonte através de uma função func declarada.so map () é um termo

enquanto

  • flatMap (func) Semelhante ao mapa, mas cada item de entrada pode ser mapeado para 0 ou mais itens de saída, portanto, o func deve retornar uma Sequência em vez de um único item.
Kondas Lamar Jnr
fonte