Em muitas situações da vida real em que você aplica o MapReduce, os algoritmos finais acabam sendo várias etapas do MapReduce.
ou seja, Mapa1, Reduzir1, Mapa2, Reduzir2 e assim por diante.
Portanto, você tem a saída da última redução necessária como entrada para o próximo mapa.
Os dados intermediários são algo que você (em geral) não deseja manter depois que o pipeline for concluído com êxito. Também porque esses dados intermediários são, em geral, alguma estrutura de dados (como um 'mapa' ou um 'conjunto'), você não deseja colocar muito esforço na escrita e leitura desses pares de valores-chave.
Qual é a maneira recomendada de fazer isso no Hadoop?
Existe um exemplo (simples) que mostra como lidar com esses dados intermediários da maneira correta, incluindo a limpeza posterior?
Respostas:
Acho que este tutorial na rede de desenvolvedores do Yahoo o ajudará com isso: Encadeamento de empregos
Você usa o
JobClient.runJob()
. O caminho de saída dos dados do primeiro trabalho se torna o caminho de entrada para o seu segundo trabalho. Eles precisam ser transmitidos como argumentos para seus trabalhos com o código apropriado para analisá-los e configurar os parâmetros para o trabalho.Eu acho que o método acima pode, no entanto, ser o que a API mapred agora mais antiga fez, mas ainda deve funcionar. Haverá um método semelhante na nova API mapreduce, mas não tenho certeza do que seja.
No que diz respeito à remoção de dados intermediários após a conclusão de um trabalho, você pode fazer isso no seu código. A maneira que eu fiz isso antes está usando algo como:
Onde o caminho é o local no HDFS dos dados. Você precisa certificar-se de excluir esses dados apenas uma vez que nenhum outro trabalho exija.
fonte
Existem várias maneiras de fazer isso.
(1) Trabalhos em cascata
Crie o objeto JobConf "job1" para o primeiro trabalho e defina todos os parâmetros com "input" como diretório de entrada e "temp" como diretório de saída. Execute este trabalho:
Imediatamente abaixo dele, crie o objeto JobConf "job2" para o segundo trabalho e defina todos os parâmetros com "temp" como diretório de entrada e "saída" como diretório de saída. Execute este trabalho:
(2) Crie dois objetos JobConf e defina todos os parâmetros neles exatamente como (1), exceto que você não usa o JobClient.run.
Em seguida, crie dois objetos Job com jobconfs como parâmetros:
Usando o objeto jobControl, você especifica as dependências do trabalho e, em seguida, executa os trabalhos:
(3) Se você precisar de uma estrutura parecida com o Mapa + | Reduzir | Mapa *, você pode usar as classes ChainMapper e ChainReducer que acompanham o Hadoop versão 0.19 em diante.
fonte
Na verdade, existem várias maneiras de fazer isso. Vou me concentrar em dois.
Uma é via Riffle ( http://github.com/cwensel/riffle ), uma biblioteca de anotações para identificar itens dependentes e "executá-los" em ordem de dependência (topológica).
Ou você pode usar uma cascata (e MapReduceFlow) em cascata ( http://www.cascading.org/ ). Uma versão futura suportará anotações Riffle, mas agora funciona muito bem com trabalhos brutos do MR JobConf.
Uma variante disso é não gerenciar tarefas de MR manualmente, mas desenvolver seu aplicativo usando a API em cascata. Em seguida, o JobConf e o encadeamento de trabalhos são tratados internamente por meio do planejador em cascata e das classes Flow.
Dessa forma, você gasta seu tempo concentrando-se no seu problema, não na mecânica do gerenciamento de tarefas do Hadoop, etc. Você pode até colocar diferentes idiomas no topo (como clojure ou jruby) para simplificar ainda mais seu desenvolvimento e aplicativos. http://www.cascading.org/modules.html
fonte
Fiz o encadeamento de tarefas usando os objetos JobConf, um após o outro. Tomei o exemplo do WordCount para encadear os trabalhos. Um trabalho descobre quantas vezes uma palavra é repetida na saída especificada. O segundo trabalho recebe a saída do primeiro trabalho como entrada e calcula o total de palavras na entrada fornecida. Abaixo está o código que precisa ser colocado na classe Driver.
O comando para executar esses trabalhos é:
Total / jar do bin / hadoop.
Precisamos dar o nome dos trabalhos finais para o comando. No caso acima, são TotalWords.
fonte
Você pode executar a cadeia MR da maneira indicada no código.
ATENÇÃO : Apenas o código do driver foi fornecido
A SEQUÊNCIA É
( JOB1 ) MAP-> REDUZIR-> ( JOB2 ) MAP
Isso foi feito para obter as chaves classificadas, mas existem outras maneiras, como usar um mapa de árvore.
No entanto, quero focar sua atenção na maneira como os trabalhos foram encadeados! !
Obrigado
fonte
Você pode usar o oozie para processar os seus trabalhos do MapReduce. http://issues.apache.org/jira/browse/HADOOP-5303
fonte
Existem exemplos no projeto Apache Mahout que interligam várias tarefas do MapReduce. Um dos exemplos pode ser encontrado em:
RecommendenderJob.java
http://search-lucene.com/c/Mahout:/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java%7C%7CRecommenderJob
fonte
Podemos fazer uso do
waitForCompletion(true)
método do trabalho para definir a dependência entre o trabalho.No meu cenário, eu tinha três empregos que eram dependentes um do outro. Na classe do driver, usei o código abaixo e funciona conforme o esperado.
fonte
A nova classe org.apache.hadoop.mapreduce.lib.chain.ChainMapper ajuda esse cenário
fonte
Embora existam mecanismos complexos de fluxo de trabalho do Hadoop baseados em servidor, por exemplo, oozie, eu tenho uma biblioteca java simples que permite a execução de várias tarefas do Hadoop como um fluxo de trabalho. A configuração do trabalho e o fluxo de trabalho que definem a dependência entre trabalhos são configurados em um arquivo JSON. Tudo é configurável externamente e não requer nenhuma alteração no mapa existente, reduzindo a implementação para fazer parte de um fluxo de trabalho.
detalhes podem ser encontrados aqui. O código-fonte e o jar estão disponíveis no github.
http://pkghosh.wordpress.com/2011/05/22/hadoop-orchestration/
Pranab
fonte
Acho que o oozie ajuda os trabalhos consequentes a receber os insumos diretamente do trabalho anterior. Isso evita a operação de E / S executada com o jobcontrol.
fonte
Se você deseja encadear programaticamente seus trabalhos, desejará usar o JobControl. O uso é bastante simples:
Depois disso, você adiciona instâncias de ControlledJob. O ControlledJob define um trabalho com suas dependências, conectando automaticamente entradas e saídas para caber em uma "cadeia" de trabalhos.
inicia a corrente. Você vai querer colocar isso em um tópico específico. Isso permite verificar o status da sua cadeia enquanto ela é executada:
fonte
Como você mencionou no seu requisito que deseja que o / p do MRJob1 seja o i / p do MRJob2 e assim por diante, considere o uso do fluxo de trabalho do oozie para esta base de dados. Além disso, você pode considerar gravar seus dados intermediários no HDFS, pois eles serão usados pelo próximo MRJob. E após a conclusão do trabalho, você pode limpar seus dados intermediários.
fonte