Eu quero ler um monte de arquivos de texto em um local hdfs e executar o mapeamento nele em uma iteração usando o spark.
JavaRDD<String> records = ctx.textFile(args[1], 1);
é capaz de ler apenas um arquivo por vez.
Quero ler mais de um arquivo e processá-los como um único RDD. Quão?
apache-spark
user3705662
fonte
fonte
Path
opções se aplicam.sc.wholeTextFiles
é útil para dados que não são linha-delimitadosc.textFile(multipleCommaSeparatedDirs,320)
ele leva a19430
tarefas totais em vez de320
... ele se comporta comounion
que também leva a número insano de tarefas de muito baixo paralelismowholeTextFiles
. Qual é o seu caso de uso? Posso pensar em uma solução alternativa, desde que você use o mesmo número de partições que os arquivos ...Use da
union
seguinte maneira:Então o
bigRdd
é o RDD com todos os arquivos.fonte
Você pode usar uma única chamada textFile para ler vários arquivos. Scala:
fonte
sc.textFile(files.mkString(","))
Você pode usar isso
Primeiro, você pode obter um buffer / lista de caminhos S3:
Agora passe este objeto List para o seguinte trecho de código, observe: sc é um objeto do SQLContext
Agora você tem um RDD Unificado final, isto é, df
Opcional E você também pode reparticioná-lo em um único BigRDD
O reparticionamento sempre funciona: D
fonte
No PySpark, encontrei uma maneira útil adicional de analisar arquivos. Talvez haja um equivalente em Scala, mas não me sinto confortável o suficiente com uma tradução de trabalho. Com efeito, é uma chamada textFile com a adição de rótulos (no exemplo abaixo, a chave = nome do arquivo, valor = 1 linha do arquivo).
TextFile "Rotulado"
entrada:
output: array com cada entrada contendo uma tupla usando o nome do arquivo como chave e com value = cada linha do arquivo. (Tecnicamente, usando esse método, você também pode usar uma chave diferente, além do nome real do caminho do arquivo - talvez uma representação de hash para economizar memória). ie
Você também pode recombinar como uma lista de linhas:
Spark_Full.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()
Ou recombine arquivos inteiros de volta para seqüências de caracteres únicas (neste exemplo, o resultado é o mesmo que você obtém de wholeTextFiles, mas com a sequência "file:" removida do caminho do arquivo.):
Spark_Full.groupByKey().map(lambda x: (x[0], ' '.join(list(x[1])))).collect()
fonte
Spark_Full += sc.textFile(filename).keyBy(lambda x: filename)
eu recebi o erro ieTypeError: 'PipelinedRDD' object is not iterable
. Meu entendimento é que, essa linha cria um RDD que é imutável, então eu queria saber como você pode anexá-lo a outra variável?você pode usar
aqui você obterá o caminho do seu arquivo e o conteúdo desse arquivo. para que você possa executar qualquer ação de um arquivo inteiro de cada vez que salve a sobrecarga
fonte
Todas as respostas estão corretas com
sc.textFile
Eu só estava me perguntando por que não
wholeTextFiles
Por exemplo, neste caso ...Uma limitação é que, precisamos carregar arquivos pequenos, caso contrário, o desempenho será ruim e poderá levar ao OOM.
Nota :
Referência adicional a visitar
fonte
sc.wholeTextFiles(folder).flatMap...
Existe uma solução limpa direta disponível. Use o método wholeTextFiles (). Isso exigirá um diretório e formará um par de valores-chave. O RDD retornado será um par RDD. Encontre abaixo a descrição dos documentos do Spark :
fonte
Experimente esta interface usada para gravar um DataFrame em sistemas de armazenamento externo (por exemplo, sistemas de arquivos, armazenamento de valores-chave, etc.). Use DataFrame.write () para acessar isso.
Novo na versão 1.4.
csv (caminho, modo = nenhum, compactação = nenhum, sep = nenhum, citação = nenhum, escape = nenhum, cabeçalho = nenhum, nullValue = nenhum, escapeQuotes = nenhum, quoteAll = nenhum, dateFormat = nenhum, timestampFormat = nenhum) Salva o conteúdo do DataFrame no formato CSV no caminho especificado.
Parâmetros: path - o caminho em qualquer modo de sistema de arquivos suportado pelo Hadoop - especifica o comportamento da operação de salvamento quando os dados já existem.
anexar: anexa o conteúdo deste DataFrame aos dados existentes. substituir: sobrescreve os dados existentes. ignorar: ignore silenciosamente esta operação se os dados já existirem. erro (caso padrão): lança uma exceção se os dados já existirem. compressão - codec de compressão a ser usado ao salvar em arquivo. Esse pode ser um dos nomes abreviados que não diferenciam maiúsculas de minúsculas (nenhum, bzip2, gzip, lz4, snappy e deflate). sep - define o caractere único como um separador para cada campo e valor. Se Nenhum estiver definido, ele usará o valor padrão,,. quote - define o caractere único usado para escapar dos valores entre aspas, onde o separador pode fazer parte do valor. Se Nenhum estiver definido, ele usará o valor padrão ". Se você desejar desativar as cotações, precisará definir uma sequência vazia. Escape - define o caractere único usado para escapar aspas dentro de um valor já citado. Se Nenhum estiver definido , ele usa o valor padrão, \ escapeQuotes - Um sinalizador que indica se os valores que contêm aspas sempre devem ser colocados entre aspas. Se Nenhum estiver definido, ele usará o valor padrão true, escapando a todos os valores que contêm um caractere de citação. quoteAll - Um sinalizador que indica se todos os valores devem sempre estar entre aspas. Se Nenhum estiver definido, ele usará o valor padrão false, somente valores de escape contendo um caractere de citação. header - escreve os nomes das colunas como a primeira linha. Se Nenhum estiver definido, ele usará o valor padrão, false. nullValue - define a representação de string de um valor nulo. Se Nenhum estiver definido, ele usará o valor padrão, sequência vazia. dateFormat - define a string que indica um formato de data. Os formatos de data personalizados seguem os formatos em java.text.SimpleDateFormat. Isso se aplica ao tipo de data. Se Nenhum estiver definido, ele usará o valor padrão, aaaa-MM-dd. timestampFormat - define a sequência que indica um formato de carimbo de data / hora. Os formatos de data personalizados seguem os formatos em java.text.SimpleDateFormat. Isso se aplica ao tipo de carimbo de data e hora. Se Nenhum estiver definido, ele usará o valor padrão, aaaa-MM-dd'T'HH: mm: ss.SSSZZ.
fonte
fonte