Apache Spark: O número de núcleos versus o número de executores

192

Estou tentando entender a relação do número de núcleos e o número de executores ao executar uma tarefa do Spark no YARN.

O ambiente de teste é o seguinte:

  • Número de nós de dados: 3
  • Especificação da máquina do nó de dados:
    • CPU: Core i7-4790 (nº de núcleos: 4, nº de threads: 8)
    • RAM: 32 GB (8 GB x 4)
    • HDD: 8 TB (2 TB x 4)
  • Rede: 1Gb

  • Versão Spark: 1.0.0

  • Versão do Hadoop: 2.4.0 (Hortonworks HDP 2.1)

  • Fluxo de trabalho do Spark: sc.textFile -> filtro -> mapa -> filtro -> mapToPair -> reduzirByKey -> mapa -> saveAsTextFile

  • Dados de entrada

    • Tipo: arquivo de texto único
    • Tamanho: 165GB
    • Número de linhas: 454.568.833
  • Resultado

    • Número de linhas após o segundo filtro: 310,640,717
    • Número de linhas do arquivo de resultados: 99.848.268
    • Tamanho do arquivo de resultado: 41GB

O trabalho foi executado com as seguintes configurações:

  1. --master yarn-client --executor-memory 19G --executor-cores 7 --num-executors 3 (executores por nó de dados, use até núcleos)

  2. --master yarn-client --executor-memory 19G --executor-cores 4 --num-executors 3 (número de núcleos reduzido)

  3. --master yarn-client --executor-memory 4G --executor-cores 2 --num-executors 12 (menos núcleo, mais executor)

Tempos decorridos:

  1. 50 min 15 seg

  2. 55 min 48 seg

  3. 31 min 23 seg

Para minha surpresa, (3) foi muito mais rápido.
Eu pensei que (1) seria mais rápido, pois haveria menos comunicação entre executores ao embaralhar.
Embora o número de núcleos de (1) seja menor que (3), o número de núcleos não é o fator principal, pois 2) teve um bom desempenho.

(Seguintes foram adicionados após a resposta de pwilmot.)

Para obter informações, a captura de tela do monitor de desempenho é a seguinte:

  • Resumo do nó de dados Ganglia para (1) - trabalho iniciado às 04:37.

Resumo do nó de dados Ganglia para (1)

  • Resumo do nó de dados Ganglia para (3) - trabalho iniciado às 19:47. Por favor, ignore o gráfico antes dessa hora.

Resumo do nó de dados Ganglia para (3)

O gráfico divide-se aproximadamente em 2 seções:

  • Primeiro: do começo ao reduzirByKey: uso intensivo da CPU, nenhuma atividade de rede
  • Segundo: depois de reduzem-seByKey: a CPU diminui, a E / S da rede é concluída.

Como mostra o gráfico, (1) pode usar a energia da CPU que foi fornecida. Portanto, pode não ser o problema do número de threads.

Como explicar esse resultado?

Zeodtr
fonte
2
Agora estou suspeitando do GC ... De fato, na interface do usuário do Spark, o tempo total gasto no GC é maior em 1) do que 2).
21414 zeodtr
Por que você não tentou 3) com 19G? Será que limitar os trabalhadores em 4G reduz o efeito NUMA que algumas pessoas têm mancha? ou seja, seu 4G está localizado em um dos 2 núcleos alocados ao seu fluxo de trabalho e, portanto, há menos lentidão de E / S, resultando em melhores desempenhos gerais. Caso contrário, acho que uma pergunta principal é: quantos núcleos / thread podem usar um único executor em um trabalhador? (Só se pode especificar o número total de núcleos para um trabalhador, não na granularidade do executor)
Bacon
4
Btw Acabei de verificar o código em core / src / main / scala / org / apache / spark / deploy / worker / ExecutorRunner.scala e parece que 1 executor = 1 thread do trabalhador.
Bacon
um pouco tarde, mas aqui está um post sobre cloudera sobre este tópico: blog.cloudera.com/blog/2015/03/…
Orelus
1
A propósito, eu encontrei essas informações em um deck de slides cloudera slideshare.net/cloudera/… , que explica um pouco sobre a tomada de decisões em executores, núcleos e memória
Manish Sahni

Respostas:

58

Com sorte, para tornar tudo isso um pouco mais concreto, veja um exemplo prático de configuração de um aplicativo Spark para usar o máximo possível do cluster: Imagine um cluster com prático seis nós executando o NodeManagers, cada um equipado com 16 núcleos e 64 GB de memória . As capacidades do NodeManager, yarn.nodemanager.resource.memory-mb e yarn.nodemanager.resource.cpu-vcores, provavelmente devem ser configuradas para 63 * 1024 = 64512 (megabytes) e 15, respectivamente. Evitamos alocar 100% dos recursos para contêineres YARN porque o nó precisa de alguns recursos para executar os daemons OS e Hadoop. Nesse caso, deixamos um gigabyte e um núcleo para esses processos do sistema. O Cloudera Manager ajuda contabilizando-os e configurando essas propriedades do YARN automaticamente.

O primeiro impulso provável seria usar --num-executors 6 --executor-cores 15 --executor-memory 63G . No entanto, esta é a abordagem errada porque:

63 GB + a sobrecarga de memória do executor não se encaixa na capacidade de 63 GB dos NodeManagers. O mestre do aplicativo ocupará um núcleo em um dos nós, o que significa que não haverá espaço para um executor de 15 núcleos nesse nó. 15 núcleos por executor podem levar a uma taxa de transferência de E / S HDFS ruim.

Uma opção melhor seria usar --num-executors 17 --executor-cores 5 --executor-memory 19G . Por quê?

Essa configuração resulta em três executores em todos os nós, exceto aquele com o AM, que terá dois executores. --executor-memory foi derivada como (63/3 executores por nó) = 21. 21 * 0,07 = 1,47. 21 - 1,47 ~ 19.

A explicação foi dada em um artigo no blog de Cloudera, Como fazer: ajustar seus trabalhos do Apache Spark (parte 2) .

DzOrdre
fonte
1
"Esta configuração resulta em três executores em todos os nós, exceto no nó AM, que terá dois executores." O que isso significa com "--executor-cores 5"?
Derek
Isso significa que cada executor usa 5 núcleos. Cada nó possui 3 executores, portanto, usando 15 núcleos, exceto que um dos nós também estará executando o mestre de aplicativos para o trabalho, portanto, somente pode hospedar 2 executores, ou seja, 10 núcleos em uso como executores.
Davos
Bem explicado - observe que isso se aplica a yarn.scheduler.capacity.resource-calculatorpessoas com deficiência, que é o padrão. Isso ocorre porque, por padrão, ele agenda pela Memória e não pela CPU.
YoYo
1
Mais executores podem levar a uma taxa de transferência de E / S HDFS ruim. Então, se eu não estiver usando o HDFS, nesse caso, posso usar mais de 5 núcleos por executor?
Darshan
Embora o mestre de aplicativos seja executado em cada nó. Conforme acima, o que significa que haveria apenas 1 mestre de aplicativos para executar o trabalho. Isso está correto?
Roshan Fernando
15

Conforme você executa seu aplicativo spark no HDFS, de acordo com Sandy Ryza

Percebi que o cliente HDFS tem problemas com toneladas de threads simultâneos. Um palpite aproximado é que, no máximo, cinco tarefas por executor podem atingir uma taxa de transferência total de gravação, portanto, é bom manter o número de núcleos por executor abaixo desse número.

Portanto, acredito que sua primeira configuração é mais lenta que a terceira devido a uma taxa de transferência de E / S HDFS ruim

tgbaggio
fonte
11

Eu não brinquei com essas configurações, então isso é apenas especulação, mas se pensarmos sobre esse problema como núcleos e threads normais em um sistema distribuído, no cluster você poderá usar até 12 núcleos (máquinas 4 * 3) e 24 threads (8 * 3 máquinas). Nos dois primeiros exemplos, você está dando ao seu trabalho um número razoável de núcleos (espaço potencial de computação), mas o número de threads (trabalhos) a serem executados nesses núcleos é tão limitado que você não pode usar grande parte do poder de processamento alocado e, portanto, o trabalho é mais lento, embora haja mais recursos de computação alocados.

você menciona que sua preocupação estava na etapa de reprodução aleatória - embora seja bom limitar a sobrecarga na etapa de reprodução aleatória, geralmente é muito mais importante utilizar a paralelização do cluster. Pense no caso extremo - um único programa encadeado com zero shuffle.

pwilmot
fonte
Obrigado pela sua resposta. Mas suspeito que o número de threads não seja o principal problema. Eu adicionei a captura de tela de monitoramento. Como mostra o gráfico, 1) pode usar tanta energia da CPU quanto foi fornecida.
zeodtr
1
O @zeodtr pwilmot está correto - você precisa de 2-4 tarefas MÍNIMAS para utilizar todo o potencial de seus núcleos. Coloque como era - normalmente uso pelo menos 1000 partições para o meu cluster de 80 núcleos.
samthebest
@samthebest O que eu quero saber é o motivo da diferença de desempenho entre 1) e 3). Quando assisto à interface do usuário do Spark, ambas executam 21 tarefas em paralelo na seção 2. (por que 21 em vez de 24 no caso de 3) são desconhecidas no momento) Mas, as tarefas para 3) são executadas mais rapidamente.
zeodtr
10

Resposta curta : acho que tgbaggio está certo. Você atinge os limites de taxa de transferência do HDFS em seus executores.

Penso que a resposta aqui pode ser um pouco mais simples do que algumas das recomendações aqui.

A pista para mim está no gráfico da rede de cluster. Para a execução 1, a utilização é constante em ~ 50 M bytes / s. Para a execução 3, a utilização constante é dobrada, em torno de 100 M bytes / s.

Na postagem do blog cloudera compartilhada por DzOrd , você pode ver esta citação importante:

Percebi que o cliente HDFS tem problemas com toneladas de threads simultâneos. Um palpite aproximado é que, no máximo, cinco tarefas por executor podem atingir uma taxa de transferência total de gravação, portanto, é bom manter o número de núcleos por executor abaixo desse número.

Então, vamos fazer alguns cálculos para ver qual desempenho esperamos se isso for verdade.


Execute 1: 19 GB, 7 núcleos, 3 executores

  • 3 executores x 7 threads = 21 threads
  • com 7 núcleos por executor, esperamos E / S limitadas a HDFS (máximo de ~ 5 núcleos)
  • rendimento efetivo ~ = 3 executores x 5 threads = 15 threads

Execute 3: 4 GB, 2 núcleos, 12 executores

  • 2 executores x 12 threads = 24 threads
  • 2 núcleos por executor, portanto, o rendimento em hdfs está ok
  • rendimento efetivo ~ = 12 executores x 2 threads = 24 threads

Se o trabalho for 100% limitado pela simultaneidade (o número de threads). Esperamos que o tempo de execução seja perfeitamente inversamente correlacionado com o número de threads.

ratio_num_threads = nthread_job1 / nthread_job3 = 15/24 = 0.625
inv_ratio_runtime = 1/(duration_job1 / duration_job3) = 1/(50/31) = 31/50 = 0.62

assim ratio_num_threads ~= inv_ratio_runtime , e parece que estamos com uma rede limitada.

Esse mesmo efeito explica a diferença entre a execução 1 e a execução 2.


Execute 2: 19 GB, 4 núcleos, 3 executores

  • 3 executores x 4 threads = 12 threads
  • com 4 núcleos por executor, ok IO para HDFS
  • rendimento efetivo ~ = 3 executores x 4 threads = 12 threads

Comparando o número de threads efetivos e o tempo de execução:

ratio_num_threads = nthread_job2 / nthread_job1 = 12/15 = 0.8
inv_ratio_runtime = 1/(duration_job2 / duration_job1) = 1/(55/50) = 50/55 = 0.91

Não é tão perfeito quanto a última comparação, mas ainda vemos uma queda semelhante no desempenho quando perdemos threads.

Agora, para a última parte: por que temos um desempenho melhor com mais threads, esp. mais threads do que o número de CPUs?

Uma boa explicação da diferença entre paralelismo (o que obtemos dividindo dados em várias CPUs) e simultaneidade (o que obtemos quando usamos vários threads para trabalhar em uma única CPU) é fornecida neste ótimo post por Rob Pike: Concurrency não é paralelismo .

A breve explicação é que, se um trabalho do Spark estiver interagindo com um sistema de arquivos ou rede, a CPU gasta muito tempo aguardando a comunicação com essas interfaces e não gastando muito tempo realmente "realizando o trabalho". Ao dar a essas CPUs mais de uma tarefa para trabalhar por vez, elas gastam menos tempo esperando e mais tempo trabalhando, e você obtém melhor desempenho.

turtlemonvh
fonte
1
Explicação interessante e convincente, pergunto-me se você achou que o executor tem um limite de 5 tarefas para atingir o rendimento máximo.
Dat Nguyen
Portanto, o número 5 não é algo que eu inventei: eu apenas notei sinais de gargalo de IO e saí em busca de onde esses gargalos podem estar surgindo.
turtlemonvh
8

Dos excelentes recursos disponíveis na página do pacote Sparklyr do RStudio :

DEFINIÇÕES DE CHUMBO :

Pode ser útil fornecer algumas definições simples para a nomenclatura Spark:

: um servidor

Nó de trabalho : um servidor que faz parte do cluster e está disponível para executar tarefas do Spark

Nó mestre : o servidor que coordena os nós do trabalhador.

Executor : Um tipo de máquina virtual dentro de um nó. Um nó pode ter vários executores.

Nó do Driver : O Nó que inicia a sessão do Spark. Normalmente, este será o servidor em que o sparklyr está localizado.

Driver (executor) : o nó do driver também será exibido na lista de executores.

d8aninja
fonte
1

A alocação dinâmica do Spark fornece flexibilidade e aloca recursos dinamicamente. Nesse número de min e max de executores podem ser dados. Também é possível fornecer o número de executores que devem ser iniciados no início do aplicativo.

Leia abaixo o mesmo:

http://spark.apache.org/docs/latest/configuration.html#dynamic-allocation

Harikrishnan Ck
fonte
1

Há um pequeno problema nas duas primeiras configurações, eu acho. Os conceitos de threads e núcleos são os seguintes. O conceito de encadeamento é que, se os núcleos forem ideais, use esse núcleo para processar os dados. Portanto, a memória não é totalmente utilizada nos dois primeiros casos. Se você quiser comparar este exemplo, escolha as máquinas com mais de 10 núcleos em cada máquina. Então faça a marca do banco.

Mas não forneça mais de 5 núcleos por executor, haverá um gargalo no desempenho de E / S.

Portanto, as melhores máquinas para fazer essa marcação de bancada podem ser nós de dados com 10 núcleos.

Especificação da máquina do nó de dados: CPU: Core i7-4790 (número de núcleos: 10, número de threads: 20) RAM: 32GB (8GB x 4) HDD: 8TB (2TB x 4)

estrela Solitária
fonte
0

Eu acho que uma das principais razões é a localidade. O tamanho do seu arquivo de entrada é 165G, os blocos relacionados ao arquivo certamente distribuídos por vários DataNodes, mais executores podem evitar a cópia em rede.

Tente definir o executor num número igual de blocos, acho que pode ser mais rápido.

zwb
fonte