Pergunta interessante, passei algum tempo olhando o código para obter os detalhes e aqui estão meus pensamentos. As divisões são gerenciadas pelo cliente InputFormat.getSplits
, portanto, uma olhada em FileInputFormat fornece as seguintes informações:
- Para cada arquivo de entrada, obtenha o comprimento do arquivo, o tamanho do bloco e calcule o tamanho da divisão como
max(minSize, min(maxSize, blockSize))
onde maxSize
corresponde mapred.max.split.size
e minSize
é mapred.min.split.size
.
Divida o arquivo em diferentes FileSplit
s com base no tamanho da divisão calculado acima. O que é importante aqui é que cada um FileSplit
seja inicializado com um start
parâmetro correspondente ao deslocamento no arquivo de entrada . Ainda não há tratamento das linhas nesse ponto. A parte relevante do código se parece com isto:
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
Depois disso, se você olhar para o LineRecordReader
que é definido por TextInputFormat
, é aí que as linhas são tratadas:
- Quando você inicializa o seu,
LineRecordReader
ele tenta instanciar um, LineReader
que é uma abstração para poder ler as linhas FSDataInputStream
. Existem 2 casos:
- Se houver um
CompressionCodec
definido, este codec é responsável por lidar com os limites. Provavelmente não é relevante para sua pergunta.
Se não houver nenhum codec, entretanto, é aí que as coisas são interessantes: se o start
de seu InputSplit
for diferente de 0, então você volta 1 caractere e pula a primeira linha que encontrar identificada por \ n ou \ r \ n (Windows) ! O retrocesso é importante porque, no caso de seus limites de linha serem iguais aos limites de divisão, isso garante que você não ignore a linha válida. Aqui está o código relevante:
if (codec != null) {
in = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
--start;
fileIn.seek(start);
}
in = new LineReader(fileIn, job);
}
if (skipFirstLine) { // skip first line and re-establish "start".
start += in.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end - start));
}
this.pos = start;
Portanto, como as divisões são calculadas no cliente, os mapeadores não precisam ser executados em sequência, todo mapeador já sabe se precisa descartar a primeira linha ou não.
Então, basicamente, se você tiver 2 linhas de cada 100Mb no mesmo arquivo, e para simplificar vamos dizer que o tamanho da divisão é 64Mb. Então, quando as divisões de entrada são calculadas, teremos o seguinte cenário:
- Divisão 1 contendo o caminho e os hosts para este bloco. Inicializado no início 200-200 = 0 MB, comprimento 64 MB.
- A divisão 2 foi inicializada no início 200-200 + 64 = 64Mb, comprimento 64Mb.
- A divisão 3 foi inicializada no início 200-200 + 128 = 128Mb, comprimento 64Mb.
- A divisão 4 foi inicializada no início 200-200 + 192 = 192Mb, comprimento 8Mb.
- O mapeador A processará a divisão 1, o início é 0, então não pule a primeira linha e leia uma linha inteira que vai além do limite de 64 MB, portanto, precisa de leitura remota.
- O mapeador B processará a divisão 2, o início é! = 0, então pule a primeira linha após 64 MB-1byte, que corresponde ao final da linha 1 em 100 MB que ainda está na divisão 2, temos 28 MB da linha na divisão 2, então remoto ler os 72Mb restantes.
- O mapeador C processará a divisão 3, o início é! = 0, então pule a primeira linha após 128Mb-1byte, que corresponde ao final da linha 2 a 200Mb, que é o fim do arquivo, então não faça nada.
- O mapeador D é igual ao mapeador C, exceto que procura uma nova linha após 192Mb-1byte.
LineReader.readLine
função, não acho que seja relevante para a sua pergunta, mas posso adicionar mais detalhes se necessário.\r\n, \n
representa o truncamento de registro)?O algoritmo Map Reduce não funciona em blocos físicos do arquivo. Ele funciona em divisões de entrada lógicas. A divisão da entrada depende de onde o registro foi escrito. Um registro pode abranger dois mapeadores.
Da maneira como o HDFS foi configurado, ele divide arquivos muito grandes em grandes blocos (por exemplo, medindo 128 MB) e armazena três cópias desses blocos em nós diferentes no cluster.
O HDFS não tem conhecimento do conteúdo desses arquivos. Um registro pode ter sido iniciado no Bloco-a, mas o final desse registro pode estar presente no Bloco-b .
Para resolver esse problema, o Hadoop usa uma representação lógica dos dados armazenados em blocos de arquivos, conhecidos como divisões de entrada. Quando um cliente de trabalho MapReduce calcula as divisões de entrada , ele descobre onde o primeiro registro inteiro em um bloco começa e onde termina o último registro no bloco .
O ponto chave:
Nos casos em que o último registro em um bloco está incompleto, a divisão de entrada inclui informações de localização para o próximo bloco e o deslocamento de byte dos dados necessários para completar o registro.
Dê uma olhada no diagrama abaixo.
Dê uma olhada neste artigo e na pergunta SE relacionada: Sobre a divisão de arquivos Hadoop / HDFS
Mais detalhes podem ser lidos na documentação
A estrutura Map-Reduce depende do InputFormat do trabalho para:
InputSplit[] getSplits(JobConf job,int numSplits
) é a API para cuidar dessas coisas.FileInputFormat , que estende o método
InputFormat
implementadogetSplits
(). Dê uma olhada na parte interna deste método em grepcodefonte
Eu vejo isso da seguinte maneira: InputFormat é responsável por dividir os dados em divisões lógicas, levando em consideração a natureza dos dados.
Nada impede que isso aconteça, embora possa adicionar latência significativa ao trabalho - toda a lógica e leitura em torno dos limites de tamanho de divisão desejados acontecerão no rastreador de trabalho.
O formato de entrada com reconhecimento de registro mais simples é TextInputFormat. Está funcionando da seguinte forma (pelo que entendi pelo código) - formato de entrada cria divisões por tamanho, independentemente das linhas, mas LineRecordReader sempre:
a) Pule a primeira linha na divisão (ou parte dela), se não for a primeira divisão
b) Lê uma linha após o limite da divisão no final (se houver dados disponíveis, portanto, não é a última divisão).
fonte
Skip first line in the split (or part of it), if it is not the first split
- se o primeiro registro no bloco que não for o primeiro estiver completo, não tenho certeza de como essa lógica funcionará.Pelo que entendi, quando o
FileSplit
é inicializado para o primeiro bloco, o construtor padrão é chamado. Portanto, os valores de início e comprimento são zero inicialmente. Ao final do processamento do primeiro bloco, se a última linha estiver incompleta, então o valor do comprimento será maior do que o comprimento da divisão e lerá a primeira linha do próximo bloco também. Devido a isso, o valor de início para o primeiro bloco será maior que zero e nesta condição, oLineRecordReader
irá pular a primeira linha do segundo bloco. (Ver fonte )Caso a última linha do primeiro bloco esteja completa, então o valor do comprimento será igual ao comprimento do primeiro bloco e o valor do início do segundo bloco será zero. Nesse caso, o
LineRecordReader
não irá pular a primeira linha e ler o segundo bloco do início.Faz sentido?
fonte
Do código-fonte hadoop de LineRecordReader.java o construtor: Encontro alguns comentários:
a partir disso, acredito que o hadoop lerá uma linha extra para cada divisão (no final da divisão atual, lerá a próxima linha na próxima divisão) e, se não for a primeira divisão, a primeira linha será descartada. de modo que nenhum registro de linha seja perdido e incompleto
fonte
Os mapeadores não precisam se comunicar. Os blocos de arquivo estão em HDFS e o mapeador atual (RecordReader) pode ler o bloco que possui a parte restante da linha. Isso acontece nos bastidores.
fonte