Объединение нескольких заданий MapReduce в Hadoop
Во многих реальных ситуациях, когда вы применяете MapReduce, конечные алгоритмы заканчиваются несколькими шагами MapReduce.
то есть. Map1, Reduce1, Map2, Reduce2 и т.д.
Итак, у вас есть результат последнего сокращения, необходимого в качестве ввода для следующей карты.
Промежуточные данные - это то, что вы (в общем) не хотите сохранять после успешного завершения конвейера. Кроме того, поскольку эти промежуточные данные в целом представляют собой некоторую структуру данных (например, "карта" или "набор" ), вы не хотите прикладывать слишком много усилий для написания и чтения этих пар ключ-значение.
Каков рекомендуемый способ сделать это в Hadoop?
Есть ли (простой) пример, показывающий, как правильно обрабатывать эти промежуточные данные, включая очистку после?
Ответы
Ответ 1
Я думаю, что этот учебник по сети разработчиков Yahoo поможет вам в этом: Цепочные задания
Вы используете JobClient.runJob()
. Выходной путь данных из первого задания становится входным путем для вашего второго задания. Они должны быть переданы в качестве аргументов для ваших заданий с соответствующим кодом для их анализа и настройки параметров для задания.
Я думаю, что вышеупомянутый метод мог бы быть тем, как это сделал ранее устаревший API-интерфейс, но он все равно должен работать. В новом API-интерфейсе mapreduce будет аналогичный метод, но я не уверен, что это такое.
Что касается удаления промежуточных данных после завершения задания, вы можете сделать это в своем коде. То, как я это делал раньше, использует что-то вроде:
FileSystem.delete(Path f, boolean recursive);
Если путь - это местоположение на HDFS данных. Вы должны убедиться, что вы удаляете эти данные только после того, как это не требует другой работы.
Ответ 2
Есть много способов сделать это.
(1) Каскадные задания
Создайте объект JobConf "job1" для первого задания и установите все параметры с "input" в качестве inputdirectory и "temp" в качестве выходного каталога. Выполните эту работу:
JobClient.run(job1).
Сразу под ним создайте объект JobConf "job2" для второго задания и задайте все параметры с помощью "temp" в качестве входного каталога и "output" в качестве выходного каталога. Выполните эту работу:
JobClient.run(job2).
(2) Создайте два объекта JobConf и задайте в них все параметры, как (1), за исключением того, что вы не используете JobClient.run.
Затем создайте два объекта Job с параметрами jobconfs:
Job job1=new Job(jobconf1);
Job job2=new Job(jobconf2);
Используя объект jobControl, вы указываете зависимости задания и затем запускаете задания:
JobControl jbcntrl=new JobControl("jbcntrl");
jbcntrl.addJob(job1);
jbcntrl.addJob(job2);
job2.addDependingJob(job1);
jbcntrl.run();
(3) Если вам нужна структура, похожая на Map+ | Уменьшить | Map *, вы можете использовать классы ChainMapper и ChainReducer, которые поставляются с Hadoop версии 0.19 и выше.
Ответ 3
На самом деле существует несколько способов сделать это. Я сосредоточусь на двух.
Один из них - через Riffle (http://github.com/cwensel/riffle) библиотеку аннотаций для идентификации зависимых вещей и "выполнения" их в зависимости (топологическом) порядке.
Или вы можете использовать Cascade (и MapReduceFlow) в каскадировании (http://www.cascading.org/). Будущая версия будет поддерживать аннотации Riffle, но теперь она отлично работает с необработанными заданиями Job Job Job JobMon.
Вариант этого заключается в том, чтобы вручную не выполнять задания MR вручную, а разрабатывать ваше приложение с помощью Cascading API. Затем JobConf и цепочка заданий обрабатываются внутренне через каскадные планировщики и классы Flow.
Таким образом, вы уделяете время сосредоточению внимания на своей проблеме, а не на механизме управления рабочими местами Hadoop и т.д. Вы даже можете сложить разные языки сверху (например, clojure или jruby), чтобы еще больше упростить разработку и приложения. http://www.cascading.org/modules.html
Ответ 4
Я сделал цепочку заданий используя объекты JobConf один за другим. Я взял пример WordCount для цепочки заданий. Одна работа вычисляет, сколько раз слово повторяется в данном выводе. Второе задание принимает выходные данные первого задания и вычисляет общее количество слов в заданном вводе. Ниже приведен код, который необходимо поместить в класс Driver.
//First Job - Counts, how many times a word encountered in a given file
JobConf job1 = new JobConf(WordCount.class);
job1.setJobName("WordCount");
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);
job1.setMapperClass(WordCountMapper.class);
job1.setCombinerClass(WordCountReducer.class);
job1.setReducerClass(WordCountReducer.class);
job1.setInputFormat(TextInputFormat.class);
job1.setOutputFormat(TextOutputFormat.class);
//Ensure that a folder with the "input_data" exists on HDFS and contains the input files
FileInputFormat.setInputPaths(job1, new Path("input_data"));
//"first_job_output" contains data that how many times a word occurred in the given file
//This will be the input to the second job. For second job, input data name should be
//"first_job_output".
FileOutputFormat.setOutputPath(job1, new Path("first_job_output"));
JobClient.runJob(job1);
//Second Job - Counts total number of words in a given file
JobConf job2 = new JobConf(TotalWords.class);
job2.setJobName("TotalWords");
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(IntWritable.class);
job2.setMapperClass(TotalWordsMapper.class);
job2.setCombinerClass(TotalWordsReducer.class);
job2.setReducerClass(TotalWordsReducer.class);
job2.setInputFormat(TextInputFormat.class);
job2.setOutputFormat(TextOutputFormat.class);
//Path name for this job should match first job output path name
FileInputFormat.setInputPaths(job2, new Path("first_job_output"));
//This will contain the final output. If you want to send this jobs output
//as input to third job, then third jobs input path name should be "second_job_output"
//In this way, jobs can be chained, sending output one to other as input and get the
//final output
FileOutputFormat.setOutputPath(job2, new Path("second_job_output"));
JobClient.runJob(job2);
Команда для запуска этих заданий:
корзина для бин/хадупа TotalWords.
Нам нужно дать имя окончательной работы для команды. В приведенном выше случае это TotalWords.
Ответ 5
Вы можете запустить цепочку MR в порядке, указанном в коде.
ПОЖАЛУЙСТА, ОБРАТИТЕ ВНИМАНИЕ: указан только код драйвера
public class WordCountSorting {
// here the word keys shall be sorted
//let us write the wordcount logic first
public static void main(String[] args)throws IOException,InterruptedException,ClassNotFoundException {
//THE DRIVER CODE FOR MR CHAIN
Configuration conf1=new Configuration();
Job j1=Job.getInstance(conf1);
j1.setJarByClass(WordCountSorting.class);
j1.setMapperClass(MyMapper.class);
j1.setReducerClass(MyReducer.class);
j1.setMapOutputKeyClass(Text.class);
j1.setMapOutputValueClass(IntWritable.class);
j1.setOutputKeyClass(LongWritable.class);
j1.setOutputValueClass(Text.class);
Path outputPath=new Path("FirstMapper");
FileInputFormat.addInputPath(j1,new Path(args[0]));
FileOutputFormat.setOutputPath(j1,outputPath);
outputPath.getFileSystem(conf1).delete(outputPath);
j1.waitForCompletion(true);
Configuration conf2=new Configuration();
Job j2=Job.getInstance(conf2);
j2.setJarByClass(WordCountSorting.class);
j2.setMapperClass(MyMapper2.class);
j2.setNumReduceTasks(0);
j2.setOutputKeyClass(Text.class);
j2.setOutputValueClass(IntWritable.class);
Path outputPath1=new Path(args[1]);
FileInputFormat.addInputPath(j2, outputPath);
FileOutputFormat.setOutputPath(j2, outputPath1);
outputPath1.getFileSystem(conf2).delete(outputPath1, true);
System.exit(j2.waitForCompletion(true)?0:1);
}
}
ПОСЛЕДОВАТЕЛЬНОСТЬ ЕСТЬ
(JOB1) MAP-> REDUCE-> (JOB2) MAP
Это было сделано для сортировки ключей, но есть и другие способы, такие как использование древовидной карты
И все же я хочу сосредоточить ваше внимание на том, как Джобс был прикован !!
Спасибо
Ответ 6
Вы можете использовать oozie для обработки barch ваших заданий MapReduce. http://issues.apache.org/jira/browse/HADOOP-5303
Ответ 7
В проекте Apache Mahout есть примеры, которые объединяют несколько заданий MapReduce. Один из примеров можно найти по адресу:
RecommenderJob.java
http://search-lucene.com/c/Mahout:/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java%7C%7CRecommenderJob
Ответ 8
Мы можем использовать метод waitForCompletion(true)
для задания, чтобы определить зависимость между заданием.
В моем сценарии у меня было 3 работы, которые зависели друг от друга. В классе драйверов я использовал приведенный ниже код, и он работает, как и ожидалось.
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
CCJobExecution ccJobExecution = new CCJobExecution();
Job distanceTimeFraudJob = ccJobExecution.configureDistanceTimeFraud(new Configuration(),args[0], args[1]);
Job spendingFraudJob = ccJobExecution.configureSpendingFraud(new Configuration(),args[0], args[1]);
Job locationFraudJob = ccJobExecution.configureLocationFraud(new Configuration(),args[0], args[1]);
System.out.println("****************Started Executing distanceTimeFraudJob ================");
distanceTimeFraudJob.submit();
if(distanceTimeFraudJob.waitForCompletion(true))
{
System.out.println("=================Completed DistanceTimeFraudJob================= ");
System.out.println("=================Started Executing spendingFraudJob ================");
spendingFraudJob.submit();
if(spendingFraudJob.waitForCompletion(true))
{
System.out.println("=================Completed spendingFraudJob================= ");
System.out.println("=================Started locationFraudJob================= ");
locationFraudJob.submit();
if(locationFraudJob.waitForCompletion(true))
{
System.out.println("=================Completed locationFraudJob=================");
}
}
}
}
Ответ 9
Новый класс org.apache.hadoop.mapreduce.lib.chain.ChainMapper поможет в этом сценарии
Ответ 10
Хотя существуют сложные серверные процессы Hadoop на основе сервера, например, oozie, у меня есть простая java-библиотека, которая позволяет выполнять несколько заданий Hadoop в качестве рабочего процесса. Конфигурация заданий и рабочий процесс, определяющие зависимость между заданиями, настраиваются в файле JSON. Все настраивается извне и не требует каких-либо изменений в реализации существующей карты, чтобы быть частью рабочего процесса.
Подробности можно найти здесь. Исходный код и банку доступны в github.
http://pkghosh.wordpress.com/2011/05/22/hadoop-orchestration/
Пранаб
Ответ 11
Я думаю, что oozie помогает последующим заданиям получать данные непосредственно от предыдущей работы. Это позволяет избежать операции ввода-вывода, выполняемой с помощью управления заданиями.
Ответ 12
Если вы хотите программно связать свою работу, вы захотите использовать JobControl. Использование довольно просто:
JobControl jobControl = new JobControl(name);
После этого вы добавляете экземпляры ControlledJob. ControlledJob определяет работу с ее зависимостями, таким образом автоматически подключая входы и выходы, чтобы соответствовать "цепочке" заданий.
jobControl.add(new ControlledJob(job, Arrays.asList(controlledjob1, controlledjob2));
jobControl.run();
запускает цепочку. Вы захотите поместить это в поток скорости. Это позволяет проверять состояние вашей цепочки во время ее работы:
while (!jobControl.allFinished()) {
System.out.println("Jobs in waiting state: " + jobControl.getWaitingJobList().size());
System.out.println("Jobs in ready state: " + jobControl.getReadyJobsList().size());
System.out.println("Jobs in running state: " + jobControl.getRunningJobList().size());
List<ControlledJob> successfulJobList = jobControl.getSuccessfulJobList();
System.out.println("Jobs in success state: " + successfulJobList.size());
List<ControlledJob> failedJobList = jobControl.getFailedJobList();
System.out.println("Jobs in failed state: " + failedJobList.size());
}
Ответ 13
Как вы упомянули в своем требовании, что вы хотите, чтобы o/p MRJob1 был i/p MRJob2 и т.д., Вы можете рассмотреть возможность использования рабочего процесса oozie для этого варианта использования. Также вы можете записать свои промежуточные данные в HDFS, так как они будут использованы в следующем MRJob. И после завершения работы вы можете очистить промежуточные данные.
<start to="mr-action1"/>
<action name="mr-action1">
<!-- action for MRJob1-->
<!-- set output path = /tmp/intermediate/mr1-->
<ok to="end"/>
<error to="end"/>
</action>
<action name="mr-action2">
<!-- action for MRJob2-->
<!-- set input path = /tmp/intermediate/mr1-->
<ok to="end"/>
<error to="end"/>
</action>
<action name="success">
<!-- action for success-->
<ok to="end"/>
<error to="end"/>
</action>
<action name="fail">
<!-- action for fail-->
<ok to="end"/>
<error to="end"/>
</action>
<end name="end"/>