Кластер кластеров vs order by vs sort by
Насколько я понимаю;
-
сортировать только в редукторе
-
порядок по порядку вещей по всему миру, но сует все в один редукторы
-
Кластер по интеллектуально распределяет вещи в редукторы по ключевому хешу и делает сортировку по
Таким образом, мой вопрос - гарантирует ли кластер глобальный порядок? Распределить путем помещает те же ключи в те же редукторы, но как насчет смежных ключей?
Единственный документ, который я могу найти по этому вопросу, находится здесь, и из примера кажется, что он заказывает их по всему миру. Но из определения я чувствую, что это не всегда так.
Ответы
Ответ 1
Краткий ответ: да, CLUSTER BY
гарантирует глобальный порядок, если вы готовы присоединиться к нескольким выходным файлам самостоятельно.
Более длинная версия:
-
ORDER BY x
: гарантирует глобальный порядок, но делает это путем проталкивания всех данных через один редуктор. Это в принципе неприемлемо для больших наборов данных. В результате вы получите один отсортированный файл. -
SORT BY x
: упорядочивает данные в каждом из N редукторов, но каждый редуктор может получать перекрывающиеся диапазоны данных. В итоге вы получите N или более отсортированных файлов с перекрывающимися диапазонами. -
DISTRIBUTE BY x
: гарантирует, что каждый из N редукторов получит непересекающиеся диапазоны x
, но не сортирует выходные данные каждого редуктора. Вы получите N или более несортированных файлов с неперекрывающимися диапазонами. -
CLUSTER BY x
: гарантирует, что каждый из N редукторов получит непересекающиеся диапазоны, а затем сортирует по этим диапазонам в редукторах. Это дает вам глобальное упорядочение, и то же самое, что и при выполнении (DISTRIBUTE BY x
и SORT BY x
). Вы получите N или более отсортированных файлов с неперекрывающимися диапазонами.
Есть смысл? Таким образом, CLUSTER BY
является в основном более масштабируемой версией ORDER BY
.
Ответ 2
Позвольте мне пояснить в первую очередь: clustered by
распределяет ваши ключи только в разных ведрах, clustered by ... sorted by
сортирует сортировки.
С помощью простого эксперимента (см. ниже) вы можете увидеть, что по умолчанию вы не получите глобальный порядок. Причина в том, что по умолчанию разделитель разделяет ключи с использованием хеш-кодов независимо от фактического заказа клавиш.
Однако вы можете полностью упорядочить свои данные.
Мотивация - "Hadoop: окончательное руководство" Тома Уайта (3-е издание, глава 8, стр. 274, "Всего сортировка" ), где он обсуждает TotalOrderPartitioner.
Сначала я отвечу на ваш вопрос TotalOrdering, а затем опишу несколько экспериментов по улусу, связанных с сортировкой, которые я сделал.
Имейте в виду: то, что я описываю здесь, является "доказательством концепции", я смог обработать один пример, используя дистрибутив Claudera CDH3.
Изначально я надеялся, что org.apache.hadoop.mapred.lib.TotalOrderPartitioner сделает трюк. К сожалению, это не так, потому что это похоже на разделение Hive по значению, а не на ключ. Поэтому я исправляю его (должен иметь подкласс, но у меня нет на это времени):
Заменить
public int getPartition(K key, V value, int numPartitions) {
return partitions.findPartition(key);
}
с
public int getPartition(K key, V value, int numPartitions) {
return partitions.findPartition(value);
}
Теперь вы можете установить (исправленный) TotalOrderPartitioner как ваш обозреватель кустов:
hive> set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
hive> set total.order.partitioner.natural.order=false
hive> set total.order.partitioner.path=/user/yevgen/out_data2
Я также использовал
hive> set hive.enforce.bucketing = true;
hive> set mapred.reduce.tasks=4;
в моих тестах.
Файл out_data2 сообщает TotalOrderPartitioner, как значения bucket.
Вы генерируете out_data2 путем отбора ваших данных. В моих тестах я использовал 4 ведра и ключи от 0 до 10. Я генерировал out_data2 с использованием ad-hoc-подхода:
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.fs.FileSystem;
public class TotalPartitioner extends Configured implements Tool{
public static void main(String[] args) throws Exception{
ToolRunner.run(new TotalPartitioner(), args);
}
@Override
public int run(String[] args) throws Exception {
Path partFile = new Path("/home/yevgen/out_data2");
FileSystem fs = FileSystem.getLocal(getConf());
HiveKey key = new HiveKey();
NullWritable value = NullWritable.get();
SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf(), partFile, HiveKey.class, NullWritable.class);
key.set( new byte[]{1,3}, 0, 2);//partition at 3; 1 came from Hive -- do not know why
writer.append(key, value);
key.set( new byte[]{1, 6}, 0, 2);//partition at 6
writer.append(key, value);
key.set( new byte[]{1, 9}, 0, 2);//partition at 9
writer.append(key, value);
writer.close();
return 0;
}
}
Затем я скопировал результат out_data2 в HDFS (в/user/yevgen/out_data2)
С этими настройками я получил свои данные в bucketed/sorted (см. последний пункт в моем списке экспериментов).
Вот мои эксперименты.
-
Создание типовых данных
bash > echo -e "1\n3\n2\n4\n5\n7\n6\n8\n9\n0" > data.txt
-
Создайте базовую тестовую таблицу:
hive > create table test (x int);
hive > загрузить данные local inpath 'data.txt' в таблицу test;
В основном эта таблица содержит значения от 0 до 9 без порядка.
-
Продемонстрируйте, как работает копирование таблицы (действительно параметр mapred.reduce.tasks, который устанавливает максимальное количество сокращаемых задач MAXIMAL)
hive > создать таблицу test2 (x int);
hive > set mapred.reduce.tasks = 4;
hive > вставить таблицу перезаписи test2
выберите a.x из теста a
тест присоединения b
на a.x = b.x; - глупое объединение, чтобы заставить нетривиальное сокращение карты
bash > hasoop fs -cat/user/hive/warehouse/test2/000001_0
1
5
9
-
Демонстрируйте балансировку. Вы можете видеть, что ключи выбираются случайным образом без какого-либо порядка сортировки:
hive > создать таблицу test3 (x int)
сгруппированы по (x) в 4 ведра;
hive > set hive.enforce.bucketing = true;
hive > вставить таблицу перезаписи test3
выберите * из теста;
bash > hadoop fs -cat/user/hive/warehouse/test3/000000_0
4
-
0
-
Букинг с сортировкой. Результаты частично отсортированы, а не полностью отсортированы.
hive > создать таблицу test4 (x int)
сгруппированные по (x), отсортированные по (x desc)
в 4 ведра;
hive > вставить таблицу перезаписи test4
выберите * из теста;
bash > hasoop fs -cat/user/hive/warehouse/test4/000001_0
1
5
9
Вы можете видеть, что значения отсортированы в порядке возрастания. Похож на ошибку Hive в CDH3?
-
Частично отсортировано без указания кластера:
hive > создать таблицу test5 как
выберите x
от испытания
распределять по х
sort by x desc;
bash > hasoop fs -cat/user/hive/warehouse/test5/000001_0
9
5
1
-
Используйте мой исправленный TotalOrderParitioner:
hive > set hive.mapred.partitioner = org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
hive > set total.order.partitioner.natural.order = false
hive > set total.order.partitioner.path =/user/training/out_data2
hive > создать таблицу test6 (x int)
сгруппированные по (x), отсортированные по (x) на 4 ковши,
hive > вставить таблицу перезаписи test6
выберите * из теста;
bash > hasoop fs -cat/user/hive/warehouse/test6/000000_0
1
2
0
bash > hasoop fs -cat/user/hive/warehouse/test6/000001_0
3
4
5
bash > hasoop fs -cat/user/hive/warehouse/test6/000002_0
7
6
-
bash > hasoop fs -cat/user/hive/warehouse/test6/000003_0
9
Ответ 3
Как я понимаю, короткий ответ - нет. Вы получите перекрывающиеся диапазоны.
Из документации SortBy: "Cluster By - это сокращение как для Distribute By, так и для Sort By". "Все строки с одинаковыми столбцами Распределить по будут идти к одному и тому же редуктору". Но нет информации, которая распространялась бы по гарантии непересекающихся диапазонов.
Более того, из документации DDL BucketedTables: "Как Hive распределяет строки по сегментам? В общем, номер сегмента определяется выражением hash_function (bucketing_column) mod num_buckets". Я предполагаю, что Cluster by в операторе Select использует тот же принцип для распределения строк между редукторами, потому что он используется главным образом для заполнения данных таблицами с интервалом.
Я создал таблицу с 1 целочисленным столбцом "а" и вставил туда цифры от 0 до 9.
Затем я установил число редукторов на 2 set mapred.reduce.tasks = 2;
,
И select
данные из этой таблицы с помощью Cluster by
clause select * from my_tab cluster by a;
И получил результат, который я ожидал:
0
2
4
6
8
1
3
5
7
9
Итак, первый редуктор (число 0) получил четные числа (потому что их режим 2 дает 0)
а второй редуктор (номер 1) получил нечетные числа (потому что их режим 2 дает 1)
Так что, как работает "Распределить по".
А затем "Сортировка по" сортирует результаты внутри каждого редуктора.
Ответ 4
CLUSTER BY не создает глобальное упорядочение.
Принятый ответ (Ларс Йенкен) вводит в заблуждение, заявляя, что редукторы получат неперекрывающиеся диапазоны. Поскольку Антон Завирюхин правильно указывает на документацию BucketedTables, CLUSTER BY в основном РАСПРОСТРАНЯЕТСЯ (так же, как bucketing) плюс SORT BY в каждом ковше/редукторе. И РАСПРОСТРАНЯЕТСЯ просто хэши и моды в ведра, а в то время как хеширующая функция может сохранить порядок (хеш i > хэш j, если i > j) mod hash value does not.
Здесь лучший пример, показывающий перекрывающиеся диапазоны
http://myitlearnings.com/bucketing-in-hive/
Ответ 5
Кластер на каждую сортировку редуктора не является глобальным. Во многих книгах также упоминается неправильно или смутно. Это особенно полезно, когда вы говорите, что вы распределяете каждый отдел на конкретный редуктор, а затем сортируете по имени сотрудника в каждом отделе и не заботитесь о том, чтобы какой-либо блок не использовался, и он больше выполняет ant по мере распределения рабочей нагрузки среди редукторов.
Ответ 6
SortBy: N или более отсортированных файлов с перекрывающимися диапазонами.
OrderBy: один выход, т.е. полностью упорядоченный.
Распределить по: Распределить За счет защиты каждый из N редукторов получает непересекающиеся диапазоны столбца, но не сортирует выходные данные каждого редуктора.
Для получения дополнительной информации http://commandstech.com/hive-sortby-vs-orderby-vs-distributeby-vs-clusterby/
ClusterBy: Обратитесь к тому же примеру, что и выше, если мы используем Cluster By x, два редуктора будут дополнительно сортировать строки по x: