Comecei a usar o Spark SQL e os DataFrames no Spark 1.4.0. Estou querendo definir um particionador personalizado no DataFrames, no Scala, mas não vendo como fazer isso.
Uma das tabelas de dados com as quais estou trabalhando contém uma lista de transações, por conta, silimar no exemplo a seguir.
Account Date Type Amount
1001 2014-04-01 Purchase 100.00
1001 2014-04-01 Purchase 50.00
1001 2014-04-05 Purchase 70.00
1001 2014-04-01 Payment -150.00
1002 2014-04-01 Purchase 80.00
1002 2014-04-02 Purchase 22.00
1002 2014-04-04 Payment -120.00
1002 2014-04-04 Purchase 60.00
1003 2014-04-02 Purchase 210.00
1003 2014-04-03 Purchase 15.00
Pelo menos inicialmente, a maioria dos cálculos ocorrerá entre as transações em uma conta. Então, eu gostaria de ter os dados particionados para que todas as transações de uma conta estejam na mesma partição Spark.
Mas não estou vendo uma maneira de definir isso. A classe DataFrame possui um método chamado 'repartition (Int)', onde você pode especificar o número de partições a serem criadas. Mas não estou vendo nenhum método disponível para definir um particionador personalizado para um DataFrame, como pode ser especificado para um RDD.
Os dados de origem são armazenados no Parquet. Eu vi que ao escrever um DataFrame no Parquet, você pode especificar uma coluna para particionar, então presumivelmente eu poderia dizer ao Parquet para particionar seus dados pela coluna 'Conta'. Mas pode haver milhões de contas e, se eu estiver entendendo o Parquet corretamente, ele criará um diretório distinto para cada conta, para que não pareça uma solução razoável.
Existe uma maneira de fazer com que o Spark particione esse DataFrame para que todos os dados de uma conta estejam na mesma partição?
int(account/someInteger)
e, assim, obter um número razoável de contas por diretório.partitionBy(Partitioner)
método, mas por DataFrames em vez de RDDs. Agora vejo que issopartitionBy
está disponível apenas para RDDs de pares , não sei por que.Respostas:
Spark> = 2.3.0
O SPARK-22614 expõe o particionamento de intervalo.
O SPARK-22389 expõe o particionamento de formato externo na API de fonte de dados v2 .
Spark> = 1.6.0
No Spark> = 1.6, é possível usar o particionamento por coluna para consulta e armazenamento em cache. Consulte: SPARK-11410 e SPARK-4849 usando o
repartition
método:Ao contrário do
RDDs
SparkDataset
(incluindoDataset[Row]
akaDataFrame
), não é possível usar o particionador personalizado por enquanto. Normalmente, você pode resolver isso criando uma coluna de particionamento artificial, mas ela não oferece a mesma flexibilidade.Spark <1.6.0:
Uma coisa que você pode fazer é pré-particionar os dados de entrada antes de criar um
DataFrame
Como a
DataFrame
criação de umRDD
requer apenas uma fase simples do mapa, o layout da partição existente deve ser preservado *:Da mesma maneira que você pode reparticionar os existentes
DataFrame
:Então parece que não é impossível. A questão permanece se faz sentido. Argumentarei que na maioria das vezes isso não acontece:
O reparticionamento é um processo caro. Em um cenário típico, a maioria dos dados precisa ser serializada, embaralhada e desserializada. Por outro lado, o número de operações que podem se beneficiar de dados pré-particionados é relativamente pequeno e é ainda mais limitado se a API interna não for projetada para aproveitar essa propriedade.
GROUP BY
- é possível reduzir a pegada de memória dos buffers temporários **, mas o custo geral é muito maior. Mais ou menos equivalente agroupByKey.mapValues(_.reduce)
(comportamento atual) vsreduceByKey
(pré-particionamento). É improvável que seja útil na prática.SqlContext.cacheTable
. Como parece que está usando a codificação de duração da execução, a aplicaçãoOrderedRDDFunctions.repartitionAndSortWithinPartitions
pode melhorar a taxa de compactação.O desempenho é altamente dependente da distribuição das chaves. Se estiver inclinado, resultará em uma utilização abaixo do ideal. Na pior das hipóteses, será impossível concluir o trabalho.
Conceitos relacionados
Particionando com origens JDBC :
As fontes de dados JDBC suportam
predicates
argumento . Pode ser usado da seguinte maneira:Ele cria uma única partição JDBC por predicado. Lembre-se de que se os conjuntos criados usando predicados individuais não forem disjuntos, você verá duplicatas na tabela resultante.
partitionBy
método emDataFrameWriter
:O Spark
DataFrameWriter
fornece umpartitionBy
método que pode ser usado para "particionar" dados na gravação. Ele separa os dados na gravação usando o conjunto de colunas fornecidoIsso permite que o envio de predicado seja lido para consultas com base na chave:
mas não é equivalente a
DataFrame.repartition
. Em agregações específicas como:ainda exigirá
TungstenExchange
:bucketBy
método emDataFrameWriter
(Spark> = 2.0):bucketBy
possui aplicativos semelhantes,partitionBy
mas está disponível apenas para tabelas (saveAsTable
). As informações de bucket podem ser usadas para otimizar junções:* Por layout da partição, quero dizer apenas uma distribuição de dados.
partitioned
O RDD não possui mais um particionador. ** Assumindo que não haja projeção antecipada. Se a agregação cobre apenas um pequeno subconjunto de colunas, provavelmente não há ganho algum.fonte
DataFrameWriter.partitionBy
logicamente não é o mesmo queDataFrame.repartition
. O primeiro em não embaralha, simplesmente separa a saída. Em relação à primeira pergunta.- os dados são salvos por partição e não há shuffle. Você pode verificar isso facilmente lendo arquivos individuais. Mas o Spark sozinho não tem como saber se é isso que você realmente deseja.No Spark <1.6 Se você criar um
HiveContext
, não o antigo,SqlContext
pode usar o HiveQLDISTRIBUTE BY colX...
(garante que cada um dos N redutores obtenha intervalos sem sobreposição de x) &CLUSTER BY colX...
(atalho para Distribuir por e Classificar por), por exemplo;Não tenho certeza de como isso se encaixa na API do Spark DF. Essas palavras-chave não são suportadas no SqlContext normal (observe que você não precisa ter um meta store da seção para usar o HiveContext)
EDIT: Spark 1.6+ agora tem isso na API nativa do DataFrame
fonte
Então, para começar com algum tipo de resposta:) - Você não pode
Eu não sou um especialista, mas, tanto quanto eu entendo os DataFrames, eles não são iguais ao rdd e o DataFrame não possui o Partitioner.
Geralmente, a idéia do DataFrame é fornecer outro nível de abstração que lide com esses problemas. As consultas no DataFrame são convertidas em plano lógico que é convertido em operações em RDDs. O particionamento que você sugeriu provavelmente será aplicado automaticamente ou pelo menos deveria ser.
Se você não confiar no SparkSQL, ele fornecerá algum tipo de trabalho ideal, sempre é possível transformar o DataFrame em RDD [Row], conforme sugerido nos comentários.
fonte
Use o DataFrame retornado por:
Não há uma maneira explícita de usar
partitionBy
em um DataFrame, apenas em um PairRDD, mas quando você classifica um DataFrame, ele será usado no LogicalPlan e ajudará quando você precisar fazer cálculos em cada Conta.Acabei de me deparar com o mesmo problema exato, com um quadro de dados que quero particionar por conta. Eu suponho que quando você diz "deseja que os dados sejam particionados para que todas as transações de uma conta estejam na mesma partição Spark", você o deseja para escala e desempenho, mas seu código não depende disso (como usar
mapPartitions()
etc), certo?fonte
Consegui fazer isso usando RDD. Mas não sei se essa é uma solução aceitável para você. Depois de ter o DF disponível como um RDD, você pode aplicar
repartitionAndSortWithinPartitions
para executar o reparticionamento personalizado dos dados.Aqui está uma amostra que eu usei:
fonte