Eu trabalho em um dataframe com duas colunas, mvv e count.
+---+-----+
|mvv|count|
+---+-----+
| 1 | 5 |
| 2 | 9 |
| 3 | 3 |
| 4 | 1 |
Eu gostaria de obter duas listas contendo valores de mvv e valor de contagem. Algo como
mvv = [1,2,3,4]
count = [5,9,3,1]
Então, tentei o seguinte código: A primeira linha deve retornar uma lista python de linhas. Eu queria ver o primeiro valor:
mvv_list = mvv_count_df.select('mvv').collect()
firstvalue = mvv_list[0].getInt(0)
Mas recebo uma mensagem de erro com a segunda linha:
AttributeError: getInt
python
apache-spark
pyspark
spark-dataframe
a.moussa
fonte
fonte
list(df.select('mvv').toPandas()['mvv'])
. O Arrow foi integrado ao PySpark, que aumentoutoPandas
significativamente. Não use as outras abordagens se estiver usando o Spark 2.3+. Veja minha resposta para mais detalhes de benchmarking.Respostas:
Veja, por que esse jeito que você está fazendo não está funcionando. Primeiro, você está tentando obter um inteiro de um tipo de linha , a saída de sua coleta é assim:
Se você pegar algo assim:
Você obterá o
mvv
valor. Se você quiser todas as informações do array, pode pegar algo assim:Mas se você tentar o mesmo para a outra coluna, obterá:
Isso acontece porque
count
é um método integrado. E a coluna tem o mesmo nome quecount
. Uma solução alternativa para fazer isso é alterar o nome da coluna decount
para_count
:Mas essa solução alternativa não é necessária, pois você pode acessar a coluna usando a sintaxe do dicionário:
E finalmente funcionará!
fonte
select('count')
uso desta forma:count_list = [int(i.count) for i in mvv_list.collect()]
adicionarei o exemplo à resposta.[i.['count'] for i in mvv_list.collect()]
trabalha para tornar explícito o uso da coluna chamada 'count' e não acount
funçãoSeguir uma linha fornece a lista que você deseja.
fonte
Isso lhe dará todos os elementos como uma lista.
fonte
O código a seguir irá ajudá-lo
fonte
Em meus dados, obtive estes benchmarks:
0,52 s
0,271 s
0,427 s
O resultado é o mesmo
fonte
toLocalIterator
vezcollect
disso, será ainda mais eficiente em termos de memória[row[col] for row in data.toLocalIterator()]
Se você receber o erro abaixo:
Este código resolverá seus problemas:
fonte
Fiz uma análise de benchmarking e
list(mvv_count_df.select('mvv').toPandas()['mvv'])
é o método mais rápido. Estou muito surpreso.Eu executei as diferentes abordagens em conjuntos de dados de 100 mil / 100 milhões de linhas usando um cluster i3.xlarge de 5 nós (cada nó tem 30,5 GBs de RAM e 4 núcleos) com Spark 2.4.5. Os dados foram distribuídos uniformemente em 20 arquivos Parquet compactados com uma única coluna.
Aqui estão os resultados do benchmarking (tempos de execução em segundos):
Regras de ouro a serem seguidas ao coletar dados no nó do driver:
toPandas
foi significativamente melhorado no Spark 2.3 . Provavelmente não é a melhor abordagem se você estiver usando uma versão do Spark anterior à 2.3.Veja aqui mais detalhes / resultados de benchmarking.
fonte
Uma possível solução é usar a
collect_list()
função depyspark.sql.functions
. Isso agregará todos os valores da coluna em uma matriz pyspark que é convertida em uma lista Python quando coletada:fonte