Como os estágios são divididos em tarefas no Spark?

143

Vamos assumir o seguinte: apenas um trabalho do Spark está sendo executado a cada momento.

O que eu recebo até agora

Aqui está o que eu entendo o que acontece no Spark:

  1. Quando a SparkContexté criado, cada nó do trabalhador inicia um executor. Executores são processos separados (JVM), que se conectam novamente ao programa do driver. Cada executor possui o jar do programa do driver. Sair de um driver, desliga os executores. Cada executor pode conter algumas partições.
  2. Quando um trabalho é executado, um plano de execução é criado de acordo com o gráfico de linhagem.
  3. O trabalho de execução é dividido em estágios, em que estágios contém tantas transformações e ações vizinhas (no gráfico de linhagem), mas sem embaralhamento. Assim, os estágios são separados por shuffles.

imagem 1

Eu entendi aquilo

  • Uma tarefa é um comando enviado do driver para um executor serializando o objeto Function.
  • O executor desserializa (com o jar do driver) o comando (tarefa) e o executa em uma partição.

mas

Questões)

Como divido o palco nessas tarefas?

Especificamente:

  1. As tarefas são determinadas pelas transformações e ações ou podem haver várias transformações / ações em uma tarefa?
  2. As tarefas são determinadas pela partição (por exemplo, uma tarefa por estágio por partição).
  3. As tarefas são determinadas pelos nós (por exemplo, uma tarefa por estágio por nó)?

O que eu acho (apenas resposta parcial, mesmo que correta)

Em https://0x0fff.com/spark-architecture-shuffle , o shuffle é explicado com a imagem

insira a descrição da imagem aqui

e tenho a impressão de que a regra é

cada estágio é dividido em # tarefas de número de partições, sem levar em consideração o número de nós

Para minha primeira imagem, eu diria que teria 3 tarefas de mapa e 3 tarefas de redução.

Para a imagem de 0x0fff, eu diria que existem 8 tarefas de mapa e 3 tarefas de redução (supondo que haja apenas três arquivos laranja e três verde escuro).

Perguntas abertas em qualquer caso

Isso está correto? Mas mesmo que isso esteja correto, minhas perguntas acima não são todas respondidas, porque ainda está em aberto, se várias operações (por exemplo, mapas múltiplos) estão dentro de uma tarefa ou são separadas em uma tarefa por operação.

O que os outros dizem

O que é uma tarefa no Spark? Como o trabalhador Spark executa o arquivo jar? e Como o agendador do Apache Spark divide arquivos em tarefas? são semelhantes, mas não achei que minha pergunta fosse respondida claramente lá.

Make42
fonte

Respostas:

52

Você tem um belo esboço aqui. Para responder suas perguntas

  • Um separado task não precisa ser lançado para cada partição de dados para cada stage. Considere que cada partição provavelmente residirá em locais físicos distintos - por exemplo, blocos no HDFS ou diretórios / volumes para um sistema de arquivos local.

Observe que o envio de Stages é conduzido pelo DAG Scheduler. Isso significa que estágios que não são interdependentes podem ser enviados ao cluster para execução em paralelo: isso maximiza o recurso de paralelização no cluster. Portanto, se as operações em nosso fluxo de dados puderem ocorrer simultaneamente, esperaremos o lançamento de vários estágios.

Podemos ver isso em ação no exemplo de brinquedo a seguir, no qual realizamos os seguintes tipos de operações:

  • carregar duas fontes de dados
  • execute alguma operação de mapa em ambas as fontes de dados separadamente
  • Junte-se a eles
  • execute algumas operações de mapa e filtro no resultado
  • salve o resultado

Então, em quantos estágios vamos terminar?

  • 1 estágio cada para carregar as duas fontes de dados em paralelo = 2 estágios
  • Um terceiro estágio representando o joinque depende dos outros dois estágios
  • Nota: todas as operações subseqüentes que trabalham nos dados unidos podem ser executadas no mesmo estágio, porque devem ocorrer sequencialmente. Não há nenhum benefício em iniciar estágios adicionais porque eles não podem iniciar o trabalho até que a operação anterior tenha sido concluída.

Aqui está o programa de brinquedos

val sfi  = sc.textFile("/data/blah/input").map{ x => val xi = x.toInt; (xi,xi*xi) }
val sp = sc.parallelize{ (0 until 1000).map{ x => (x,x * x+1) }}
val spj = sfi.join(sp)
val sm = spj.mapPartitions{ iter => iter.map{ case (k,(v1,v2)) => (k, v1+v2) }}
val sf = sm.filter{ case (k,v) => v % 10 == 0 }
sf.saveAsTextFile("/data/blah/out")

E aqui está o DAG do resultado

insira a descrição da imagem aqui

Agora: quantas tarefas ? O número de tarefas deve ser igual a

Soma de ( Stage* #Partitions in the stage)

javadba
fonte
2
Obrigado! Por favor, elabore sua resposta em relação ao meu texto: 1) Minha definição de estágios não é abrangente? Parece que perdi o requisito de que um estágio não possa conter operações que poderiam ser paralelas. Ou a minha descrição já está estritamente implicando isso? 2) O número de tarefas que precisam ser executadas para o trabalho é determinado pelo número de partições, mas não pelo número de processadores ou nós, enquanto o número de tarefas que podem ser executadas ao mesmo tempo depende do número de partições. processadores, certo? 3) Uma tarefa pode conter várias operações?
Make42
1
4) O que você quis dizer com sua última frase? Afinal, as partições numéricas podem variar de estágio para estágio. Você quis dizer que foi assim que você configurou seu trabalho para todas as etapas?
Make42
@ Make42 É claro que o número de partições pode variar de estágio para estágio - você está correto. Era minha intenção dizer sum(..)para levar em conta essa variação.
javadba
nossa, sua resposta foi totalmente correta, mas infelizmente a última frase é definitivamente um conceito errado. Isso não significa que os números de partição em um estágio sejam iguais ao número de processadores; no entanto, você pode definir o número de partições para um RDD de acordo com o número de núcleos apresentados em sua máquina.
epcpu
@epcpu Foi um caso especial - mas concordo que isso seria enganoso, por isso estou removendo-o.
Javadba
26

Isso pode ajudá-lo a entender melhor as diferentes partes:

  • Estágio: é uma coleção de tarefas. Mesmo processo em execução em diferentes subconjuntos de dados (partições).
  • Tarefa: representa uma unidade de trabalho em uma partição de um conjunto de dados distribuído. Assim, em cada estágio, número de tarefas = número de partições ou, como você disse, "uma tarefa por estágio por partição".
  • Cada executor é executado em um contêiner de fios e cada contêiner reside em um nó.
  • Cada estágio utiliza múltiplos executores, cada executor recebe vários vcores.
  • Cada vcore pode executar exatamente uma tarefa por vez
  • Portanto, em qualquer estágio, várias tarefas podem ser executadas em paralelo. número de tarefas em execução = número de vcores em uso.
pedram bashiri
fonte
2
Esta é uma leitura muito útil na arquitetura faísca: 0x0fff.com/spark-architecture
pedram Bashiri
Não obtive o número do seu ponto 3. Tanto quanto sei que cada nó pode ter vários executores, portanto, de acordo com o ponto 3: deve haver apenas um executor por nó. Você pode esclarecer esse ponto?
Rituparno Behera 23/07
@RituparnoBehera cada nó pode ter vários contêineres e, portanto, vários executores Spark. Confira este link. docs.cloudera.com/runtime/7.0.2/running-spark-applications/…
pedram bashiri 23/07
15

Se bem entendi, existem 2 coisas (relacionadas) que confundem você:

1) O que determina o conteúdo de uma tarefa?

2) O que determina o número de tarefas a serem executadas?

O mecanismo do Spark "cola" em conjunto operações simples em rdds consecutivos, por exemplo:

rdd1 = sc.textFile( ... )
rdd2 = rdd1.filter( ... )
rdd3 = rdd2.map( ... )
rdd3RowCount = rdd3.count

portanto, quando o rdd3 for computado (preguiçosamente), o spark gerará uma tarefa por partição do rdd1 e cada tarefa executará o filtro e o mapa por linha para resultar no rdd3.

O número de tarefas é determinado pelo número de partições. Todo RDD possui um número definido de partições. Para um RDD de origem que é lido do HDFS (usando sc.textFile (...) por exemplo), o número de partições é o número de divisões geradas pelo formato de entrada. Algumas operações no RDD (s) podem resultar em um RDD com um número diferente de partições:

rdd2 = rdd1.repartition( 1000 ) will result in rdd2 having 1000 partitions ( regardless of how many partitions rdd1 had ).

Outro exemplo é junções:

rdd3 = rdd1.join( rdd2  , numPartitions = 1000 ) will result in rdd3 having 1000 partitions ( regardless of partitions number of rdd1 and rdd2 ).

(A maioria) das operações que alteram o número de partições envolvem um shuffle. Quando fazemos, por exemplo:

rdd2 = rdd1.repartition( 1000 ) 

o que realmente acontece é que a tarefa em cada partição do rdd1 precisa produzir uma saída final que possa ser lida no estágio a seguir, para que o rdd2 tenha exatamente 1000 partições (como elas fazem isso? Hash ou Sort ). As tarefas deste lado são às vezes chamadas de "Tarefas do mapa (lado)". Uma tarefa que será executada posteriormente no rdd2 atuará em uma partição (do rdd2!) E terá que descobrir como ler / combinar as saídas do lado do mapa relevantes para essa partição. As tarefas deste lado são às vezes chamadas de "Reduzir tarefas (laterais)".

As 2 perguntas estão relacionadas: o número de tarefas em um estágio é o número de partições (comuns aos rdds consecutivos "colados" juntos) e o número de partições de um rdd pode mudar entre os estágios (especificando o número de partições para alguns baralhar causando operação, por exemplo).

Depois que a execução de um estágio começa, suas tarefas podem ocupar slots de tarefas. O número de slots de tarefas simultâneos é numExecutors * ExecutorCores. Em geral, eles podem ser ocupados por tarefas de diferentes estágios não dependentes.

Harel Gliksman
fonte