Sou novo no Spark e estou tentando ler dados CSV de um arquivo com o Spark. Aqui está o que estou fazendo:
sc.textFile('file.csv')
.map(lambda line: (line.split(',')[0], line.split(',')[1]))
.collect()
Eu esperaria que esta chamada me desse uma lista das duas primeiras colunas do meu arquivo, mas estou recebendo este erro:
File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range
embora meu arquivo CSV tenha mais de uma coluna.
python
csv
apache-spark
pyspark
Kernael
fonte
fonte
csv
biblioteca integrada para lidar com todos os escapes porque simplesmente dividir por vírgula não funcionará se, digamos, houver vírgulas nos valores.","
.Spark 2.0.0+
Você pode usar a fonte de dados csv integrada diretamente:
ou
sem incluir quaisquer dependências externas.
Spark <2.0.0 :
Em vez de análise manual, que está longe de ser trivial em um caso geral, eu recomendaria
spark-csv
:Certifique-se que CSV faísca está incluído no caminho (
--packages
,--jars
,--driver-class-path
)E carregue seus dados da seguinte maneira:
Ele pode lidar com o carregamento, inferência de esquema, eliminação de linhas malformadas e não requer a passagem de dados do Python para a JVM.
Nota :
Se você conhece o esquema, é melhor evitar a inferência do esquema e transmiti-lo
DataFrameReader
. Supondo que você tenha três colunas - inteiro, duplo e string:fonte
pyspark --packages com.databricks:spark-csv_2.11:1.4.0
(certifique-se de alterar as versões do databricks / spark para as que você instalou).fonte
E mais uma opção que consiste em ler o arquivo CSV usando o Pandas e depois importar o DataFrame do Pandas para o Spark.
Por exemplo:
fonte
A simples divisão por vírgula também dividirá as vírgulas que estão dentro dos campos (por exemplo
a,b,"1,2,3",c
), portanto, não é recomendado. A resposta de zero323 é boa se você quiser usar a API DataFrames, mas se quiser manter a base do Spark, você pode analisar csvs em Python base com o módulo csv :EDIT: Como @muon mencionou nos comentários, isso tratará o cabeçalho como qualquer outra linha, portanto, você precisará extraí-lo manualmente. Por exemplo,
header = rdd.first(); rdd = rdd.filter(lambda x: x != header)
(certifique-se de não modificarheader
antes de o filtro avaliar). Mas, neste ponto, provavelmente é melhor usar um analisador csv integrado.fonte
StringIO
.csv
pode usar qualquer iterável b)__next__
não deve ser usado diretamente e falhará na linha vazia. Dê uma olhada em flatMap c) Seria muito mais eficiente de usar emmapPartitions
vez de inicializar o leitor em cada linha :)rdd.mapPartitions(lambda x: csv.reader(x))
funciona enquantordd.map(lambda x: csv.reader(x))
lança um erro? Eu esperava que ambos jogassem o mesmoTypeError: can't pickle _csv.reader objects
. Também parece quemapPartitions
automaticamente chama algum equivalente a "readlines" nocsv.reader
objeto, onde commap
, eu precisei chamar__next__
explicitamente para obter as listas decsv.reader
. 2) OndeflatMap
entra? LigarmapPartitions
sozinho funcionou para mim.rdd.mapPartitions(lambda x: csv.reader(x))
funciona porquemapPartitions
espera umIterable
objeto. Se você quiser ser explícito, você pode compreender ou gerar expressão.map
sozinho não funciona porque não itera sobre o objeto. Daí minha sugestão de usar oflatMap(lambda x: csv.reader([x]))
que itera sobre o leitor. MasmapPartitions
é muito melhor aqui.Isto está em PYSPARK
Então você pode verificar
fonte
Se você deseja carregar csv como um dataframe, pode fazer o seguinte:
Funcionou bem para mim.
fonte
Isso está de acordo com o que JP Mercier sugeriu inicialmente sobre o uso de Pandas, mas com uma modificação importante: se você ler dados em Pandas em blocos, eles devem ser mais maleáveis. Isso significa que você pode analisar um arquivo muito maior do que o Pandas pode realmente manipular como uma única peça e passá-lo para o Spark em tamanhos menores. (Isso também responde ao comentário sobre por que alguém gostaria de usar o Spark se eles podem carregar tudo no Pandas de qualquer maneira.)
fonte
Agora, também há outra opção para qualquer arquivo csv geral: https://github.com/seahboonsiew/pyspark-csv da seguinte maneira:
Suponha que temos o seguinte contexto
Primeiro, distribua pyspark-csv.py para executores usando SparkContext
Leia dados csv via SparkContext e converta-os em DataFrame
fonte
Se seus dados csv não contiverem novas linhas em nenhum dos campos, você pode carregar seus dados com
textFile()
e analisá-losfonte
Se você tiver uma ou mais linha (s) com menos ou mais número de colunas do que 2 no conjunto de dados, este erro pode ocorrer.
Também sou novo no Pyspark e estou tentando ler arquivos CSV. O código a seguir funcionou para mim:
Neste código, estou usando o conjunto de dados do kaggle, o link é: https://www.kaggle.com/carrie1/ecommerce-data
1. Sem mencionar o esquema:
Agora verifique as colunas: sdfData.columns
A saída será:
Verifique o tipo de dados para cada coluna:
Isso fornecerá o quadro de dados com todas as colunas com tipo de dados como StringType
2. Com esquema: se você conhece o esquema ou deseja alterar o tipo de dados de qualquer coluna na tabela acima, use isso (digamos que estou tendo as seguintes colunas e as deseja em um tipo de dados específico para cada uma delas)
Agora verifique o esquema para o tipo de dados de cada coluna:
Editado: também podemos usar a seguinte linha de código sem mencionar o esquema explicitamente:
O resultado é:
A saída será semelhante a esta:
fonte
Ao usar
spark.read.csv
, acho que usar as opçõesescape='"'
emultiLine=True
fornecer a solução mais consistente para o padrão CSV e, em minha experiência, funciona melhor com arquivos CSV exportados do Planilhas Google.Isso é,
fonte
import pyspark as spark
?spark
já está inicializado. Em um script enviado porspark-submit
, você pode instanciá-lo comofrom pyspark.sql import SparkSession; spark = SparkSession.builder.getOrCreate()
.