Spark: разница семантики между сокращением и уменьшениемByKey
В документации Spark говорится, что метод RDD reduce
требует ассоциативной и коммутативной двоичной функции.
Однако метод reduceByKey
ТОЛЬКО требует ассоциативной двоичной функции.
sc.textFile("file4kB", 4)
Я сделал несколько тестов, и, видимо, это поведение, которое я получаю. Почему это различие? Почему reduceByKey
гарантирует, что двоичная функция всегда применяется в определенном порядке (для обеспечения отсутствия коммутативности), если reduce
не работает?
Пример, если нагрузка немного (маленького) текста с 4 разделами (минимум):
val r = sc.textFile("file4k", 4)
то
r.reduce(_ + _)
возвращает строку, где части не всегда находятся в одном порядке, тогда как:
r.map(x => (1,x)).reduceByKey(_ + _).first
всегда возвращает ту же строку (где все находится в том же порядке, что и в исходном файле).
(я проверил с r.glom
, и содержимое файла действительно распространяется на 4 раздела, нет пустого раздела).
Ответы
Ответ 1
Насколько мне известно, это ошибка в документации и результаты, которые вы видите, просто случайны. Практика, другие ресурсы и простой анализ кода показывают, что функция, переданная в reduceByKey
, не должна быть только ассоциативный, но коммутативный.
-
практика - в то время как похоже, что порядок сохраняется в локальном режиме, это уже не так, когда вы запускаете Spark в кластере, включая автономный режим.
-
другие ресурсы - процитировать Исследование данных с использованием Spark из AmpCamp 3:
Для этого шаблона есть удобный метод, называемый reduceByKey в Spark. Обратите внимание, что второй аргумент reduceByKey определяет количество используемых редукторов. По умолчанию Spark предполагает, что функция уменьшения является коммутативной и ассоциативной и применяет комбинаторы на стороне отображения.
-
code - reduceByKey
реализуется с помощью combineByKeyWithClassTag
и создает ShuffledRDD
. Поскольку Spark не гарантирует порядок после перетасовки, единственный способ восстановить его - это привязать некоторые метаданные к частично уменьшенным записям. Насколько я могу сказать, ничего подобного не происходит.
В стороне примечание reduce
, поскольку оно реализовано в PySpark, будет работать отлично с функцией, которая только коммутативна. Это, конечно, только деталь реализации, а не часть контракта.
Ответ 2
В соответствии с документацией кода, недавно обновленной/исправленной. (спасибо @zero323):
reduceByKey
объединяет значения для каждого ключа с помощью ассоциативной и коммутативной функции сокращения. Это также будет выполнять слияние локально на каждом картографе перед отправкой результатов в редуктор, аналогично "объединителю" в MapReduce.
Таким образом, на самом деле на самом деле ошибка документации, подобная @zero323, указала в его ответе.
Вы можете проверить следующие ссылки на код, чтобы убедиться: