Utilizando o spark 2.4.4 em execução no modo de cluster YARN com o planejador FIFO do spark.
Estou enviando várias operações de quadro de dados spark (ou seja, gravando dados no S3) usando um executor de pool de threads com um número variável de threads. Isso funciona bem se eu tiver ~ 10 threads, mas se eu usar centenas de threads, parece haver um impasse, sem trabalhos agendados de acordo com a interface do usuário do Spark.
Quais fatores controlam quantos trabalhos podem ser agendados simultaneamente? Recursos do driver (por exemplo, memória / núcleos)? Algumas outras definições de configuração do spark?
EDITAR:
Aqui está uma breve sinopse do meu código
ExecutorService pool = Executors.newFixedThreadPool(nThreads);
ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(pool);
Dataset<Row> aHugeDf = spark.read.json(hundredsOfPaths);
List<Future<Void>> futures = listOfSeveralHundredThings
.stream()
.map(aThing -> ecs.submit(() -> {
df
.filter(col("some_column").equalTo(aThing))
.write()
.format("org.apache.hudi")
.options(writeOptions)
.save(outputPathFor(aThing));
return null;
}))
.collect(Collectors.toList());
IntStream.range(0, futures.size()).forEach(i -> ecs.poll(30, TimeUnit.MINUTES));
exec.shutdownNow();
Em algum momento, à medida que nThreads
aumenta, a faísca não parece mais agendar trabalhos, conforme evidenciado por:
ecs.poll(...)
tempo limite eventualmente- A guia Trabalhos da interface do usuário do Spark que não mostra trabalhos ativos
- A guia Executores da interface do usuário do Spark que não mostra tarefas ativas para nenhum executor
- A guia SQL da UI do Spark mostrando
nThreads
consultas em execução sem IDs de tarefa em execução
Meu ambiente de execução é
- AWS EMR 5.28.1
- Spark 2.4.4
- Nó mestre =
m5.4xlarge
- Nós principais = 3x
rd5.24xlarge
spark.driver.cores=24
spark.driver.memory=32g
spark.executor.memory=21g
spark.scheduler.mode=FIFO
fonte
jstack -l
para obter um despejo de thread com informações de bloqueio.Respostas:
Se possível, grave a saída das tarefas nos hdfs do AWS Elastic MapReduce (para aproveitar as renomeações quase instantâneas e a melhor E / S de arquivo dos hdfs locais) e adicione uma etapa dstcp para mover os arquivos para o S3, para economizar todos os problemas de manipulação do entranhas de um armazenamento de objetos tentando ser um sistema de arquivos. Também a gravação em hdfs locais permitirá que você especule controlar tarefas descontroladas sem cair nas armadilhas de conflito associadas ao DirectOutputCommiter.
Se você precisar usar S3 como o diretório de saída, verifique se as seguintes configurações do Spark estão definidas
Nota: O DirectParquetOutputCommitter foi removido do Spark 2.0 devido à chance de perda de dados. Infelizmente, até termos aprimorado a consistência do S3a, precisamos trabalhar com as soluções alternativas. As coisas estão melhorando com o Hadoop 2.8
Evite nomes-chave em ordem lexicográfica. Pode-se usar prefixos hash / aleatórios ou reverter a data e a hora para se deslocar. O truque é nomear suas chaves hierarquicamente, colocando as coisas mais comuns pelas quais você filtra no lado esquerdo da sua chave. E nunca tenha sublinhados nos nomes dos buckets devido a problemas de DNS.
Ativando
fs.s3a.fast.upload upload
partes de um único arquivo no Amazon S3 em paraleloConsulte estes artigos para obter mais detalhes.
Definir spark.speculation no Spark 2.1.0 enquanto escreve para s3
https://medium.com/@subhojit20_27731/apache-spark-and-amazon-s3-gotchas-and-best-practices-a767242f3d98
fonte
Na IMO, você provavelmente está abordando esse problema errado. A menos que você possa garantir que o número de tarefas por trabalho seja muito baixo, é provável que você não obtenha muitas melhorias de desempenho paralelando centenas de trabalhos de uma só vez. Seu cluster pode suportar apenas 300 tarefas de uma só vez, supondo que você esteja usando o paralelismo padrão de 200 e apenas 1,5 trabalhos. Sugiro reescrever seu código para limitar o máximo de consultas simultâneas em 10. Suspeito muito que você tenha 300 consultas com apenas uma única tarefa de várias centenas em execução. A maioria dos sistemas de processamento de dados OLTP possui intencionalmente um nível bastante baixo de consultas simultâneas em comparação com os sistemas RDS mais tradicionais por esse motivo.
Além disso
fonte