Spark RDD - как они работают
У меня небольшая программа Scala, которая отлично работает на одном node. Однако я масштабирую его, так что он работает на нескольких узлах. Это моя первая такая попытка. Я просто пытаюсь понять, как RDD работают в Spark, поэтому этот вопрос основан на теории и может быть не на 100% правильным.
Скажем, я создаю RDD:
val rdd = sc.textFile(file)
Теперь, когда я это сделал, значит ли это, что файл в file
теперь разделен по узлам (если все узлы имеют доступ к пути к файлу)?
Во-вторых, я хочу подсчитать количество объектов в RDD (достаточно просто), однако мне нужно использовать это число в вычислении, которое должно быть применено к объектам в RDD - пример псевдокода:
rdd.map(x => x / rdd.size)
Скажем, есть 100 объектов в rdd
и говорят, что есть 10 узлов, поэтому количество 10 объектов на node (при условии, что это так работает концепция RDD), теперь, когда я вызываю метод, каждый node будет выполнять расчет с rdd.size
как 10
или 100
? Поскольку, в общем случае, размер RDD 100
, но локально на каждом node равен всего 10
. Должен ли я сделать переменную широковещания до выполнения расчета? Этот вопрос связан с вопросом ниже.
Наконец, если я сделаю преобразование в RDD, например. rdd.map(_.split("-"))
, а затем мне нужен новый size
RDD, мне нужно выполнить действие на RDD, например count()
, поэтому вся информация отправляется обратно во драйвер node?
Ответы
Ответ 1
Обычно файл (или части файла, если он слишком большой) реплицируется на N узлов в кластере (по умолчанию N = 3 на HDFS). Это не намерение разбивать каждый файл между всеми доступными узлами.
Однако, для вас (то есть клиент), работающего с файлом с использованием Spark, должен быть прозрачным - вы не должны видеть разницы в rdd.size
, независимо от того, сколько узлов он разделяет и/или реплицирует. Существуют методы (по крайней мере, в Hadoop), чтобы узнать, какие узлы (части) файла могут быть расположены в данный момент. Однако в простых случаях вам, скорее всего, не понадобится использовать эту функциональность.
UPDATE: статья, описывающая внутренности RDD: https://cs.stanford.edu/~matei/papers/2012/nsdi_spark.pdf
Ответ 2
val rdd = sc.textFile(file)
Означает ли это, что файл теперь разбит на разделы по узлам?
Файл остается там, где он был. Элементы результирующего RDD[String]
являются строками файла. RDD разбивается на разделы, чтобы соответствовать естественному разделению базовой файловой системы. Количество разделов не зависит от количества узлов, которые у вас есть.
Важно понимать, что при выполнении этой строки он не читает файл (ы). RDD является ленивым объектом и будет делать что-то только тогда, когда это необходимо. Это здорово, потому что это позволяет избежать ненужного использования памяти.
Например, если вы пишете val errors = rdd.filter(line => line.startsWith("error"))
, ничего не происходит. Если вы затем напишете val errorCount = errors.count
, теперь ваша последовательность операций должна быть выполнена, потому что результат count
является целым числом. То, что каждый рабочий ядро (поток исполнителей) будет выполнять параллельно, читает файл (или часть файла), выполняет итерацию по его строкам и подсчитывает строки, начинающиеся с "ошибки". Буферизация и GC в сторону, только одна линия на ядро будет в памяти одновременно. Это позволяет работать с очень большими данными, не используя много памяти.
Я хочу подсчитать количество объектов в RDD, однако мне нужно использовать это число в вычислении, которое должно быть применено к объектам в RDD - пример псевдокода:
rdd.map(x => x / rdd.size)
Нет метода rdd.size
. Существует rdd.count
, который подсчитывает количество элементов в RDD. rdd.map(x => x / rdd.count)
не будет работать. Код попытается отправить переменную rdd
всем работникам и завершится с ошибкой NotSerializableException
. Что вы можете сделать, это:
val count = rdd.count
val normalized = rdd.map(x => x / count)
Это работает, потому что count
является Int
и может быть сериализовано.
Если я делаю преобразование в RDD, например. rdd.map(_.split("-"))
, а затем мне нужен новый размер RDD, мне нужно выполнить действие на RDD, например count()
, поэтому вся информация отправляется обратно в драйвер node?
map
не изменяет количество элементов. Я не знаю, что вы подразумеваете под "размером". Но да, вам нужно выполнить действие, например count
, чтобы получить что-либо из RDD. Вы видите, что никакая работа не выполняется до тех пор, пока вы не выполните действие. (Когда вы выполняете count
, только счет на каждый раздел будет отправлен обратно в драйвер, конечно, не "вся информация".)