Ответ 1
Сортировка обычно должна выполняться до вызова команды collect(), так как она возвращает набор данных в программу драйвера, а также то, как задание с сокращением картой hasoop будет запрограммировано в java, так что окончательный вывод, который вы хотите записать (обычно) в HDFS. Благодаря искровому API этот подход обеспечивает гибкость записи вывода в "сырой" форме, где вы хотите, например, в файл, где он может использоваться как вход для дальнейшей обработки.
Использование искры scala Сортировка API перед сборкой() может быть выполнена по предложению eliasah и с использованием Tuple2.swap() дважды, один раз перед сортировкой и один раз после этого, чтобы создать список кортежей, отсортированных по возрастанию или уменьшению порядка их второе поле (которое называется _2) и содержит количество слов в их первом поле (с именем _1). Ниже приведен пример того, как это выполняется в искровой оболочке:
// this whole block can be pasted in spark-shell in :paste mode followed by <Ctrl>D
val file = sc.textFile("some_local_text_file_pathname")
val wordCounts = file.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _, 1) // 2nd arg configures one task (same as number of partitions)
.map(item => item.swap) // interchanges position of entries in each tuple
.sortByKey(true, 1) // 1st arg configures ascending sort, 2nd arg configures one task
.map(item => item.swap)
Чтобы отменить порядок сортировки, используйте sortByKey (false, 1), так как его первым аргументом является булевское значение возрастания. Его вторым аргументом является количество задач (равное количеству разделов), которое установлено равным 1 для тестирования с небольшим входным файлом, где требуется только один файл выходных данных; reduceByKey также принимает этот необязательный аргумент.
После этого wordCounts RDD можно сохранить как текстовые файлы в каталог с saveAsTextFile (имя_каталога), в который будут внесены один или несколько файлов part-xxxxx (начиная с part-00000) в зависимости от количества редукторов, настроенных для задания (1 файл выходных данных на редуктор), файл _SUCCESS в зависимости от того, выполнено ли задание или нет, и .crc файлы.
Используя pyspark, питон script, очень похожий на scala script, показанный выше, дает результат, который фактически тот же. Вот версия pyspark, демонстрирующая сортировку коллекции по значению:
file = sc.textFile("file:some_local_text_file_pathname")
wordCounts = file.flatMap(lambda line: line.strip().split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b, 1) \ # last arg configures one reducer task
.map(lambda (a, b): (b, a)) \
.sortByKey(1, 1) \ # 1st arg configures ascending sort, 2nd configures 1 task
.map(lambda (a, b): (b, a))
Чтобы отсортироватьKey в порядке убывания, его первый arg должен быть 0. Поскольку python захватывает ведущее и конечное пробелы в качестве данных, strip() вставлен перед разбиением каждой строки на пробелы, но это необязательно с использованием spark-shell/ scala.
Основное различие в выводе словарной строки и версии python wordCount заключается в том, что при искровых выводах (word, 3) выходы python (u'word ', 3).
Для получения дополнительной информации об искровых методах RDD см. http://spark.apache.org/docs/1.1.0/api/python/pyspark.rdd.RDD-class.html для python и https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.rdd.RDD для scala.
В искровой оболочке running collect() в wordCounts преобразует его из RDD в Array [(String, Int)] = Array [Tuple2 (String, Int)], который сам может быть отсортирован во втором поле каждый элемент Tuple2, используя:
Array.sortBy(_._2)
sortBy также принимает необязательный неявный аргумент math.Ordering, такой как Ромео Киенцлер, показал в предыдущем ответе на этот вопрос. Array.sortBy(_._ 2) будет делать обратный вид элементов массива Array Tuple2 в своих _2-областях, просто определяя неявное обратное упорядочение перед запуском map-reduce script, потому что он переопределяет ранее существовавшее упорядочение Int. Обратный int Заказ, уже определенный Ромео Киенцлером:
// for reverse order
implicit val sortIntegersByString = new Ordering[Int] {
override def compare(a: Int, b: Int) = a.compare(b)*(-1)
}
Другим распространенным способом определения этого обратного упорядочения является изменение порядка a и b и падение (-1) в правой части определения сравнения:
// for reverse order
implicit val sortIntegersByString = new Ordering[Int] {
override def compare(a: Int, b: Int) = b.compare(a)
}