Como imprimir o conteúdo de RDD?

124

Estou tentando imprimir o conteúdo de uma coleção no console do Spark.

Eu tenho um tipo:

linesWithSessionId: org.apache.spark.rdd.RDD[String] = FilteredRDD[3]

E eu uso o comando:

scala> linesWithSessionId.map(line => println(line))

Mas isto é impresso:

res1: org.apache.spark.rdd.RDD [Unidade] = MappedRDD [4] no mapa em: 19

Como gravar o RDD no console ou salvá-lo no disco para que eu possa visualizar seu conteúdo?

céu azul
fonte
1
Oi! você leu os comentários na resposta, aceitos por você? Parece ser enganosa
dk14
2
@ dk14 concordou, eu transferido resposta aceita
azul-céu
RDD está sendo relegado como cidadãos de segunda classe, você deve usar o DataFrame e o showmétodo
Thomas Decaux

Respostas:

235

Se você deseja visualizar o conteúdo de um RDD, uma maneira é usar collect():

myRDD.collect().foreach(println)

Essa não é uma boa ideia, porém, quando o RDD possui bilhões de linhas. Use take()apenas alguns para imprimir:

myRDD.take(n).foreach(println)
Oussama
fonte
1
se eu usar o foreach no RDD (que possui milhões de linhas) para gravar o conteúdo no HDFS como um único arquivo, ele funcionará sem problemas no cluster?
Shankar
A razão que eu não estou usando saveAsTextFileno RDD é, eu preciso escrever o conteúdo RDD em mais de um arquivo, é por isso que eu estou usandoforeach
Shankar
Se você deseja salvar em um único arquivo, pode unir o RDD em uma partição antes de chamar saveAsTextFile, mas novamente isso pode causar problemas. Eu acho que a melhor opção é escrever em múltiplos arquivos em HDFS, em seguida, usar hdfs dfs --getmerge, a fim de mesclar os arquivos
Oussama
você disse que, quando foreach em um RDD, ele persistirá na RAM do driver, a declaração está correta? porque o que eu entendi é foreach será executado em cada trabalhador [cluster] e não no driver.
Shankar
saveAsTextFile gravará um arquivo por partição, que é o que você deseja (vários arquivos). Caso contrário, como sugere Oussama, você pode executar rdd.coalesce (1) .saveAsTextFile () para obter um arquivo. Se o RDD tiver muito poucas partições para o seu gosto, você pode tentar rdd.repartition (N) .saveAsTextFile () #
foghorn
49

A mapfunção é uma transformação , o que significa que o Spark não avaliará seu RDD até que você execute uma ação nele.

Para imprimi-lo, você pode usar foreach(que é uma ação):

linesWithSessionId.foreach(println)

Para gravá-lo no disco, você pode usar uma das saveAs...funções (ações estáticas) da API RDD

Fedragon
fonte
6
Talvez você precise mencionar collectpara que o RDD possa ser impresso no console.
zsxwing
1
foreach-se, em primeiro lugar "materializar" o RDD e depois executar printlnem cada elemento, por isso collectnão é realmente necessário aqui (embora você pode usá-lo, é claro) ...
fedragon
5
Na verdade, sem collect (), antes do foreach, não consigo ver nada no console.
Vittorio Cozzolino
3
Na verdade, ele funciona totalmente bem no meu shell Spark, mesmo no 1.2.0. Mas acho que sei de onde vem essa confusão: a pergunta original perguntou como imprimir um RDD no console do Spark (= shell), por isso presumi que ele executaria um trabalho local, caso em que foreachfunciona bem. Se você estiver executando um trabalho em um cluster e desejar imprimir o seu rdd, deverá collect(como indicado por outros comentários e respostas) para que seja enviado ao driver antes de printlnser executado. E usar takecomo sugerido por Oussama pode ser uma boa idéia se o seu RDD for muito grande.
fedragon
6
A resposta acima é ruim. Você deve inaceitá-lo. O Foreach não será impresso no console, ele será impresso nos nós dos trabalhadores. Se você tiver apenas um nó, o foreach funcionará. Mas se você possui apenas um nó, por que está usando o spark? Basta usar o SQL awk, ou Grep, ou algo muito mais simples. Então eu acho que a única resposta válida é coletar. Se a coleta for grande demais para você e você quiser apenas uma amostra, use as funções take ou head ou simillar, conforme descrito abaixo.
eshalev
12

Se você estiver executando isso em um cluster, printlnnão imprimirá de volta ao seu contexto. Você precisa trazer os RDDdados para sua sessão. Para fazer isso, você pode forçá-lo à matriz local e imprimi-lo:

linesWithSessionId.toArray().foreach(line => println(line))
Noé
fonte
12

Você pode converter seu RDDpara um DataFrameentão show().

// For implicit conversion from RDD to DataFrame
import spark.implicits._

fruits = sc.parallelize([("apple", 1), ("banana", 2), ("orange", 17)])

// convert to DF then show it
fruits.toDF().show()

Isso mostrará as 20 principais linhas de seus dados, portanto, o tamanho dos seus dados não deve ser um problema.

+------+---+                                                                    
|    _1| _2|
+------+---+
| apple|  1|
|banana|  2|
|orange| 17|
+------+---+
Wesam
fonte
1
Eu acho que éimport spark.implicits._
Ryan Hartman
O que a biblioteca foi usada aqui? Não consigo detectar toDFnem spark.implicits._no escopo da faísca.
Sergii
1

Provavelmente existem muitas diferenças arquitetônicas entre myRDD.foreach(println)e myRDD.collect().foreach(println)(não apenas 'coletar', mas também outras ações). Uma das diferenças que vi foi ao fazer myRDD.foreach(println), a saída será em uma ordem aleatória. Por exemplo: se meu rdd é proveniente de um arquivo de texto em que cada linha tem um número, a saída terá uma ordem diferente. Mas quando eu fiz myRDD.collect().foreach(println), a ordem permanece igual ao arquivo de texto.

Karan Gupta
fonte
1

Em python

   linesWithSessionIdCollect = linesWithSessionId.collect()
   linesWithSessionIdCollect

Isso imprimirá todo o conteúdo do RDD

Niranjan Molkeri
fonte
1
Obrigado, mas eu marcado esta pergunta com scala não python
azul-céu
1
c.take(10)

e a versão mais recente do Spark mostrará bem a tabela.

Harvey
fonte
1

Em vez de digitar cada vez, você pode;

[1] Crie um método de impressão genérico dentro do Spark Shell.

def p(rdd: org.apache.spark.rdd.RDD[_]) = rdd.foreach(println)

[2] Ou melhor ainda, usando implícitos, você pode adicionar a função à classe RDD para imprimir seu conteúdo.

implicit class Printer(rdd: org.apache.spark.rdd.RDD[_]) {
    def print = rdd.foreach(println)
}

Exemplo de uso:

val rdd = sc.parallelize(List(1,2,3,4)).map(_*2)

p(rdd) // 1
rdd.print // 2

Resultado:

2
6
4
8

Importante

Isso só faz sentido se você estiver trabalhando no modo local e com uma pequena quantidade de conjunto de dados. Caso contrário, você não poderá ver os resultados no cliente ou ficar sem memória devido ao grande resultado do conjunto de dados.

noego
fonte
0

Você também pode salvar como um arquivo: rdd.saveAsTextFile("alicia.txt")

Thomas Decaux
fonte
0

Na sintaxe java:

rdd.collect().forEach(line -> System.out.println(line));
ForeverLearner
fonte