Каков самый быстрый способ массовой загрузки данных в HBase программно?
У меня есть текстовый файл Plain с возможными миллионами строк, для которого требуется индивидуальный синтаксический анализ, и я хочу как можно быстрее загрузить его в таблицу HBase (используя клиент Hadoop или HBase Java).
Мое текущее решение основано на задании MapReduce без части "Уменьшить". Я использую FileInputFormat
для чтения текстового файла, чтобы каждая строка передавалась методу map
моего класса Mapper
. В этот момент строка анализируется с образованием объекта Put
, который записывается в context
. Затем TableOutputFormat
принимает объект Put
и вставляет его в таблицу.
Это решение дает среднюю скорость ввода 1000 рядов в секунду, что меньше, чем я ожидал. Моя настройка HBase находится в псевдораспределенном режиме на одном сервере.
Интересно, что при введении 1 000 000 строк, 25 Mappers (задачи) порождаются, но они запускаются поочередно (один за другим); это нормальная?
Вот код для моего текущего решения:
public static class CustomMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
protected void map(LongWritable key, Text value, Context context) throws IOException {
Map<String, String> parsedLine = parseLine(value.toString());
Put row = new Put(Bytes.toBytes(parsedLine.get(keys[1])));
for (String currentKey : parsedLine.keySet()) {
row.add(Bytes.toBytes(currentKey),Bytes.toBytes(currentKey),Bytes.toBytes(parsedLine.get(currentKey)));
}
try {
context.write(new ImmutableBytesWritable(Bytes.toBytes(parsedLine.get(keys[1]))), row);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public int run(String[] args) throws Exception {
if (args.length != 2) {
return -1;
}
conf.set("hbase.mapred.outputtable", args[1]);
// I got these conf parameters from a presentation about Bulk Load
conf.set("hbase.hstore.blockingStoreFiles", "25");
conf.set("hbase.hregion.memstore.block.multiplier", "8");
conf.set("hbase.regionserver.handler.count", "30");
conf.set("hbase.regions.percheckin", "30");
conf.set("hbase.regionserver.globalMemcache.upperLimit", "0.3");
conf.set("hbase.regionserver.globalMemcache.lowerLimit", "0.15");
Job job = new Job(conf);
job.setJarByClass(BulkLoadMapReduce.class);
job.setJobName(NAME);
TextInputFormat.setInputPaths(job, new Path(args[0]));
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(CustomMap.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(TableOutputFormat.class);
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
Long startTime = Calendar.getInstance().getTimeInMillis();
System.out.println("Start time : " + startTime);
int errCode = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadMapReduce(), args);
Long endTime = Calendar.getInstance().getTimeInMillis();
System.out.println("End time : " + endTime);
System.out.println("Duration milliseconds: " + (endTime-startTime));
System.exit(errCode);
}
Ответы
Ответ 1
Я прошел процесс, который, вероятно, очень похож на ваш, пытаясь найти эффективный способ загрузки данных из MR в HBase. То, что я нашел для работы, использует HFileOutputFormat
как OutputFormatClass для MR.
Ниже приведена основа моего кода, который я должен сгенерировать функцию job
и Mapper map
, которая записывает данные. Это было быстро. Мы больше не используем его, поэтому у меня нет номеров под рукой, но это было около 2,5 миллионов записей за минуту.
Вот функция (урезанная), которую я написал, чтобы сгенерировать задание для моего процесса MapReduce, чтобы поместить данные в HBase
private Job createCubeJob(...) {
//Build and Configure Job
Job job = new Job(conf);
job.setJobName(jobName);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setMapperClass(HiveToHBaseMapper.class);//Custom Mapper
job.setJarByClass(CubeBuilderDriver.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(HFileOutputFormat.class);
TextInputFormat.setInputPaths(job, hiveOutputDir);
HFileOutputFormat.setOutputPath(job, cubeOutputPath);
Configuration hConf = HBaseConfiguration.create(conf);
hConf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum);
hConf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperClientPort);
HTable hTable = new HTable(hConf, tableName);
HFileOutputFormat.configureIncrementalLoad(job, hTable);
return job;
}
Это моя функция отображения из класса HiveToHBaseMapper
(слегка отредактирована).
public void map(WritableComparable key, Writable val, Context context)
throws IOException, InterruptedException {
try{
Configuration config = context.getConfiguration();
String[] strs = val.toString().split(Constants.HIVE_RECORD_COLUMN_SEPARATOR);
String family = config.get(Constants.CUBEBUILDER_CONFIGURATION_FAMILY);
String column = strs[COLUMN_INDEX];
String Value = strs[VALUE_INDEX];
String sKey = generateKey(strs, config);
byte[] bKey = Bytes.toBytes(sKey);
Put put = new Put(bKey);
put.add(Bytes.toBytes(family), Bytes.toBytes(column), (value <= 0)
? Bytes.toBytes(Double.MIN_VALUE)
: Bytes.toBytes(value));
ImmutableBytesWritable ibKey = new ImmutableBytesWritable(bKey);
context.write(ibKey, put);
context.getCounter(CubeBuilderContextCounters.CompletedMapExecutions).increment(1);
}
catch(Exception e){
context.getCounter(CubeBuilderContextCounters.FailedMapExecutions).increment(1);
}
}
Я уверен, что это не будет решением Copy & Paste для вас. Очевидно, что данные, с которыми я работал здесь, не нуждались в какой-либо пользовательской обработке (это было сделано в задании MR до этого). Главное, что я хочу выделить из этого, - это HFileOutputFormat
. Остальное - всего лишь пример того, как я его использовал.:)
Надеюсь, это поможет вам найти правильный путь к хорошему решению.
Ответ 2
Интересно, что при введении 1 000 000 строк, 25 Mappers (задачи) порождаются, но они запускаются поочередно (один за другим); это нормальная?
Параметр mapreduce.tasktracker.map.tasks.maximum
, который по умолчанию равен 2, определяет максимальное количество задач, которые могут выполняться параллельно на node. Если не изменено, вы должны увидеть одновременно 2 задания карты одновременно на каждом node.