Estou tentando entender a relação do número de núcleos e o número de executores ao executar uma tarefa do Spark no YARN.
O ambiente de teste é o seguinte:
- Número de nós de dados: 3
- Especificação da máquina do nó de dados:
- CPU: Core i7-4790 (nº de núcleos: 4, nº de threads: 8)
- RAM: 32 GB (8 GB x 4)
- HDD: 8 TB (2 TB x 4)
Rede: 1Gb
Versão Spark: 1.0.0
Versão do Hadoop: 2.4.0 (Hortonworks HDP 2.1)
Fluxo de trabalho do Spark: sc.textFile -> filtro -> mapa -> filtro -> mapToPair -> reduzirByKey -> mapa -> saveAsTextFile
Dados de entrada
- Tipo: arquivo de texto único
- Tamanho: 165GB
- Número de linhas: 454.568.833
Resultado
- Número de linhas após o segundo filtro: 310,640,717
- Número de linhas do arquivo de resultados: 99.848.268
- Tamanho do arquivo de resultado: 41GB
O trabalho foi executado com as seguintes configurações:
--master yarn-client --executor-memory 19G --executor-cores 7 --num-executors 3
(executores por nó de dados, use até núcleos)--master yarn-client --executor-memory 19G --executor-cores 4 --num-executors 3
(número de núcleos reduzido)--master yarn-client --executor-memory 4G --executor-cores 2 --num-executors 12
(menos núcleo, mais executor)
Tempos decorridos:
50 min 15 seg
55 min 48 seg
31 min 23 seg
Para minha surpresa, (3) foi muito mais rápido.
Eu pensei que (1) seria mais rápido, pois haveria menos comunicação entre executores ao embaralhar.
Embora o número de núcleos de (1) seja menor que (3), o número de núcleos não é o fator principal, pois 2) teve um bom desempenho.
(Seguintes foram adicionados após a resposta de pwilmot.)
Para obter informações, a captura de tela do monitor de desempenho é a seguinte:
- Resumo do nó de dados Ganglia para (1) - trabalho iniciado às 04:37.
- Resumo do nó de dados Ganglia para (3) - trabalho iniciado às 19:47. Por favor, ignore o gráfico antes dessa hora.
O gráfico divide-se aproximadamente em 2 seções:
- Primeiro: do começo ao reduzirByKey: uso intensivo da CPU, nenhuma atividade de rede
- Segundo: depois de reduzem-seByKey: a CPU diminui, a E / S da rede é concluída.
Como mostra o gráfico, (1) pode usar a energia da CPU que foi fornecida. Portanto, pode não ser o problema do número de threads.
Como explicar esse resultado?
fonte
Respostas:
A explicação foi dada em um artigo no blog de Cloudera, Como fazer: ajustar seus trabalhos do Apache Spark (parte 2) .
fonte
yarn.scheduler.capacity.resource-calculator
pessoas com deficiência, que é o padrão. Isso ocorre porque, por padrão, ele agenda pela Memória e não pela CPU.Conforme você executa seu aplicativo spark no HDFS, de acordo com Sandy Ryza
Portanto, acredito que sua primeira configuração é mais lenta que a terceira devido a uma taxa de transferência de E / S HDFS ruim
fonte
Eu não brinquei com essas configurações, então isso é apenas especulação, mas se pensarmos sobre esse problema como núcleos e threads normais em um sistema distribuído, no cluster você poderá usar até 12 núcleos (máquinas 4 * 3) e 24 threads (8 * 3 máquinas). Nos dois primeiros exemplos, você está dando ao seu trabalho um número razoável de núcleos (espaço potencial de computação), mas o número de threads (trabalhos) a serem executados nesses núcleos é tão limitado que você não pode usar grande parte do poder de processamento alocado e, portanto, o trabalho é mais lento, embora haja mais recursos de computação alocados.
você menciona que sua preocupação estava na etapa de reprodução aleatória - embora seja bom limitar a sobrecarga na etapa de reprodução aleatória, geralmente é muito mais importante utilizar a paralelização do cluster. Pense no caso extremo - um único programa encadeado com zero shuffle.
fonte
Penso que a resposta aqui pode ser um pouco mais simples do que algumas das recomendações aqui.
A pista para mim está no gráfico da rede de cluster. Para a execução 1, a utilização é constante em ~ 50 M bytes / s. Para a execução 3, a utilização constante é dobrada, em torno de 100 M bytes / s.
Na postagem do blog cloudera compartilhada por DzOrd , você pode ver esta citação importante:
Então, vamos fazer alguns cálculos para ver qual desempenho esperamos se isso for verdade.
Execute 1: 19 GB, 7 núcleos, 3 executores
Execute 3: 4 GB, 2 núcleos, 12 executores
Se o trabalho for 100% limitado pela simultaneidade (o número de threads). Esperamos que o tempo de execução seja perfeitamente inversamente correlacionado com o número de threads.
assim
ratio_num_threads ~= inv_ratio_runtime
, e parece que estamos com uma rede limitada.Esse mesmo efeito explica a diferença entre a execução 1 e a execução 2.
Execute 2: 19 GB, 4 núcleos, 3 executores
Comparando o número de threads efetivos e o tempo de execução:
Não é tão perfeito quanto a última comparação, mas ainda vemos uma queda semelhante no desempenho quando perdemos threads.
Agora, para a última parte: por que temos um desempenho melhor com mais threads, esp. mais threads do que o número de CPUs?
Uma boa explicação da diferença entre paralelismo (o que obtemos dividindo dados em várias CPUs) e simultaneidade (o que obtemos quando usamos vários threads para trabalhar em uma única CPU) é fornecida neste ótimo post por Rob Pike: Concurrency não é paralelismo .
A breve explicação é que, se um trabalho do Spark estiver interagindo com um sistema de arquivos ou rede, a CPU gasta muito tempo aguardando a comunicação com essas interfaces e não gastando muito tempo realmente "realizando o trabalho". Ao dar a essas CPUs mais de uma tarefa para trabalhar por vez, elas gastam menos tempo esperando e mais tempo trabalhando, e você obtém melhor desempenho.
fonte
Dos excelentes recursos disponíveis na página do pacote Sparklyr do RStudio :
fonte
A alocação dinâmica do Spark fornece flexibilidade e aloca recursos dinamicamente. Nesse número de min e max de executores podem ser dados. Também é possível fornecer o número de executores que devem ser iniciados no início do aplicativo.
Leia abaixo o mesmo:
fonte
Há um pequeno problema nas duas primeiras configurações, eu acho. Os conceitos de threads e núcleos são os seguintes. O conceito de encadeamento é que, se os núcleos forem ideais, use esse núcleo para processar os dados. Portanto, a memória não é totalmente utilizada nos dois primeiros casos. Se você quiser comparar este exemplo, escolha as máquinas com mais de 10 núcleos em cada máquina. Então faça a marca do banco.
Mas não forneça mais de 5 núcleos por executor, haverá um gargalo no desempenho de E / S.
Portanto, as melhores máquinas para fazer essa marcação de bancada podem ser nós de dados com 10 núcleos.
Especificação da máquina do nó de dados: CPU: Core i7-4790 (número de núcleos: 10, número de threads: 20) RAM: 32GB (8GB x 4) HDD: 8TB (2TB x 4)
fonte
Eu acho que uma das principais razões é a localidade. O tamanho do seu arquivo de entrada é 165G, os blocos relacionados ao arquivo certamente distribuídos por vários DataNodes, mais executores podem evitar a cópia em rede.
Tente definir o executor num número igual de blocos, acho que pode ser mais rápido.
fonte