Como o Hadoop processa os registros divididos entre os limites do bloco?

119

De acordo com Hadoop - The Definitive Guide

Os registros lógicos que FileInputFormats definem geralmente não se encaixam perfeitamente em blocos HDFS. Por exemplo, os registros lógicos de um TextInputFormat são linhas, que cruzarão os limites do HDFS com mais freqüência. Isso não tem relação com o funcionamento do seu programa - as linhas não são perdidas ou quebradas, por exemplo - mas vale a pena conhecer, pois significa que os mapas locais de dados (ou seja, mapas que estão sendo executados no mesmo host que seus dados de entrada) fará algumas leituras remotas. A leve sobrecarga que isso causa normalmente não é significativa.

Suponha que uma linha de registro seja dividida em dois blocos (b1 e b2). O mapeador que processa o primeiro bloco (b1) notará que a última linha não tem um separador EOL e busca o restante da linha do próximo bloco de dados (b2).

Como o mapeador que processa o segundo bloco (b2) determina que o primeiro registro está incompleto e deve processar a partir do segundo registro no bloco (b2)?

Praveen Sripati
fonte

Respostas:

160

Pergunta interessante, passei algum tempo olhando o código para obter os detalhes e aqui estão meus pensamentos. As divisões são gerenciadas pelo cliente InputFormat.getSplits, portanto, uma olhada em FileInputFormat fornece as seguintes informações:

  • Para cada arquivo de entrada, obtenha o comprimento do arquivo, o tamanho do bloco e calcule o tamanho da divisão como max(minSize, min(maxSize, blockSize))onde maxSizecorresponde mapred.max.split.sizee minSizeé mapred.min.split.size.
  • Divida o arquivo em diferentes FileSplits com base no tamanho da divisão calculado acima. O que é importante aqui é que cada um FileSplitseja inicializado com um startparâmetro correspondente ao deslocamento no arquivo de entrada . Ainda não há tratamento das linhas nesse ponto. A parte relevante do código se parece com isto:

    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
      int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
      splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
                               blkLocations[blkIndex].getHosts()));
      bytesRemaining -= splitSize;
    }
    

Depois disso, se você olhar para o LineRecordReaderque é definido por TextInputFormat, é aí que as linhas são tratadas:

  • Quando você inicializa o seu, LineRecordReaderele tenta instanciar um, LineReaderque é uma abstração para poder ler as linhas FSDataInputStream. Existem 2 casos:
  • Se houver um CompressionCodecdefinido, este codec é responsável por lidar com os limites. Provavelmente não é relevante para sua pergunta.
  • Se não houver nenhum codec, entretanto, é aí que as coisas são interessantes: se o startde seu InputSplitfor diferente de 0, então você volta 1 caractere e pula a primeira linha que encontrar identificada por \ n ou \ r \ n (Windows) ! O retrocesso é importante porque, no caso de seus limites de linha serem iguais aos limites de divisão, isso garante que você não ignore a linha válida. Aqui está o código relevante:

    if (codec != null) {
       in = new LineReader(codec.createInputStream(fileIn), job);
       end = Long.MAX_VALUE;
    } else {
       if (start != 0) {
         skipFirstLine = true;
         --start;
         fileIn.seek(start);
       }
       in = new LineReader(fileIn, job);
    }
    if (skipFirstLine) {  // skip first line and re-establish "start".
      start += in.readLine(new Text(), 0,
                        (int)Math.min((long)Integer.MAX_VALUE, end - start));
    }
    this.pos = start;
    

Portanto, como as divisões são calculadas no cliente, os mapeadores não precisam ser executados em sequência, todo mapeador já sabe se precisa descartar a primeira linha ou não.

Então, basicamente, se você tiver 2 linhas de cada 100Mb no mesmo arquivo, e para simplificar vamos dizer que o tamanho da divisão é 64Mb. Então, quando as divisões de entrada são calculadas, teremos o seguinte cenário:

  • Divisão 1 contendo o caminho e os hosts para este bloco. Inicializado no início 200-200 = 0 MB, comprimento 64 MB.
  • A divisão 2 foi inicializada no início 200-200 + 64 = 64Mb, comprimento 64Mb.
  • A divisão 3 foi inicializada no início 200-200 + 128 = 128Mb, comprimento 64Mb.
  • A divisão 4 foi inicializada no início 200-200 + 192 = 192Mb, comprimento 8Mb.
  • O mapeador A processará a divisão 1, o início é 0, então não pule a primeira linha e leia uma linha inteira que vai além do limite de 64 MB, portanto, precisa de leitura remota.
  • O mapeador B processará a divisão 2, o início é! = 0, então pule a primeira linha após 64 MB-1byte, que corresponde ao final da linha 1 em 100 MB que ainda está na divisão 2, temos 28 MB da linha na divisão 2, então remoto ler os 72Mb restantes.
  • O mapeador C processará a divisão 3, o início é! = 0, então pule a primeira linha após 128Mb-1byte, que corresponde ao final da linha 2 a 200Mb, que é o fim do arquivo, então não faça nada.
  • O mapeador D é igual ao mapeador C, exceto que procura uma nova linha após 192Mb-1byte.
Charles Menguy
fonte
Também @PraveenSripati, vale a pena mencionar que os casos extremos em que um limite estaria em \ r em um \ r \ n retorno são tratados na LineReader.readLinefunção, não acho que seja relevante para a sua pergunta, mas posso adicionar mais detalhes se necessário.
Charles Menguy
Vamos supor que haja duas linhas com 64 MB exatos na entrada e, portanto, os InputSplits acontecem exatamente nos limites da linha. Assim, o mapeador sempre ignorará a linha no segundo bloco porque start! = 0.
Praveen Sripati
6
@PraveenSripati Nesse caso, o segundo mapeador verá start! = 0, então retroceda 1 caractere, que o traz de volta um pouco antes do \ n da primeira linha e pule para o \ n seguinte. Portanto, ele ignorará a primeira linha, mas processará a segunda linha conforme o esperado.
Charles Menguy
@CharlesMenguy é possível que a primeira linha do arquivo seja ignorada de alguma forma? Concretamente, tenho a primeira linha com chave = 1 e valor a, então há mais duas linhas com a mesma chave em algum lugar do arquivo, chave = 1, val = be chave = 1, val = c. A questão é que meu redutor obtém {1, [b, c]} e {1, [a]}, em vez de {1, [a, b, c]}. Isso não acontecerá se eu adicionar uma nova linha ao início do meu arquivo. Qual poderia ser a razão, senhor?
Kobe-Wan Kenobi
@CharlesMenguy E se o arquivo no HDFS for um arquivo binário (ao contrário do arquivo de texto, no qual \r\n, \nrepresenta o truncamento de registro)?
CᴴᴀZ
17

O algoritmo Map Reduce não funciona em blocos físicos do arquivo. Ele funciona em divisões de entrada lógicas. A divisão da entrada depende de onde o registro foi escrito. Um registro pode abranger dois mapeadores.

Da maneira como o HDFS foi configurado, ele divide arquivos muito grandes em grandes blocos (por exemplo, medindo 128 MB) e armazena três cópias desses blocos em nós diferentes no cluster.

O HDFS não tem conhecimento do conteúdo desses arquivos. Um registro pode ter sido iniciado no Bloco-a, mas o final desse registro pode estar presente no Bloco-b .

Para resolver esse problema, o Hadoop usa uma representação lógica dos dados armazenados em blocos de arquivos, conhecidos como divisões de entrada. Quando um cliente de trabalho MapReduce calcula as divisões de entrada , ele descobre onde o primeiro registro inteiro em um bloco começa e onde termina o último registro no bloco .

O ponto chave:

Nos casos em que o último registro em um bloco está incompleto, a divisão de entrada inclui informações de localização para o próximo bloco e o deslocamento de byte dos dados necessários para completar o registro.

Dê uma olhada no diagrama abaixo.

insira a descrição da imagem aqui

Dê uma olhada neste artigo e na pergunta SE relacionada: Sobre a divisão de arquivos Hadoop / HDFS

Mais detalhes podem ser lidos na documentação

A estrutura Map-Reduce depende do InputFormat do trabalho para:

  1. Valide a especificação de entrada do trabalho.
  2. Divida o (s) arquivo (s) de entrada em InputSplits lógicos, cada um deles atribuído a um mapeador individual.
  3. Cada InputSplit é então atribuído a um mapeador individual para processamento. A divisão pode ser uma tupla . InputSplit[] getSplits(JobConf job,int numSplits) é a API para cuidar dessas coisas.

FileInputFormat , que estende o método InputFormatimplementado getSplits(). Dê uma olhada na parte interna deste método em grepcode

Ravindra babu
fonte
7

Eu vejo isso da seguinte maneira: InputFormat é responsável por dividir os dados em divisões lógicas, levando em consideração a natureza dos dados.
Nada impede que isso aconteça, embora possa adicionar latência significativa ao trabalho - toda a lógica e leitura em torno dos limites de tamanho de divisão desejados acontecerão no rastreador de trabalho.
O formato de entrada com reconhecimento de registro mais simples é TextInputFormat. Está funcionando da seguinte forma (pelo que entendi pelo código) - formato de entrada cria divisões por tamanho, independentemente das linhas, mas LineRecordReader sempre:
a) Pule a primeira linha na divisão (ou parte dela), se não for a primeira divisão
b) Lê uma linha após o limite da divisão no final (se houver dados disponíveis, portanto, não é a última divisão).

David Gruzman
fonte
Skip first line in the split (or part of it), if it is not the first split- se o primeiro registro no bloco que não for o primeiro estiver completo, não tenho certeza de como essa lógica funcionará.
Praveen Sripati
Pelo que vejo o código - cada divisão lê o que tem + próxima linha. Portanto, se a quebra de linha não estiver no limite do bloco, está tudo bem. Como exatamente o caso quando a quebra de linha está exatamente no limite do bloco - tem que ser entendido - vou ler o código um pouco mais
David Gruzman
3

Pelo que entendi, quando o FileSplité inicializado para o primeiro bloco, o construtor padrão é chamado. Portanto, os valores de início e comprimento são zero inicialmente. Ao final do processamento do primeiro bloco, se a última linha estiver incompleta, então o valor do comprimento será maior do que o comprimento da divisão e lerá a primeira linha do próximo bloco também. Devido a isso, o valor de início para o primeiro bloco será maior que zero e nesta condição, o LineRecordReaderirá pular a primeira linha do segundo bloco. (Ver fonte )

Caso a última linha do primeiro bloco esteja completa, então o valor do comprimento será igual ao comprimento do primeiro bloco e o valor do início do segundo bloco será zero. Nesse caso, o LineRecordReadernão irá pular a primeira linha e ler o segundo bloco do início.

Faz sentido?

aa8y
fonte
2
Nesse cenário, os mapeadores precisam se comunicar entre si e processar os blocos em sequência quando a última linha de um bloco específico não estiver completa. Não tenho certeza se é assim que funciona.
Praveen Sripati
1

Do código-fonte hadoop de LineRecordReader.java o construtor: Encontro alguns comentários:

// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
  start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;

a partir disso, acredito que o hadoop lerá uma linha extra para cada divisão (no final da divisão atual, lerá a próxima linha na próxima divisão) e, se não for a primeira divisão, a primeira linha será descartada. de modo que nenhum registro de linha seja perdido e incompleto

Shenghai.Geng
fonte
0

Os mapeadores não precisam se comunicar. Os blocos de arquivo estão em HDFS e o mapeador atual (RecordReader) pode ler o bloco que possui a parte restante da linha. Isso acontece nos bastidores.

user3507308
fonte