Как назначить уникальные смежные числа элементам в Spark RDD
У меня есть набор данных (user, product, review)
, и я хочу передать его в алгоритм ALS mllib.
Алгоритм требует, чтобы пользователи и продукты были числами, а мои - именами строк и строковыми SKU.
Сейчас я получаю отдельных пользователей и SKU, а затем присваиваю им числовые идентификаторы вне Spark.
Мне было интересно, есть ли лучший способ сделать это. Один из подходов, о котором я думал, заключается в написании настраиваемого RDD, который по существу перечисляет от 1 до n
, а затем вызывает zip на двух RDD.
Ответы
Ответ 1
Начиная с Spark 1.0 есть два метода, которые вы можете легко решить:
-
RDD.zipWithIndex
похож на Seq.zipWithIndex
, он добавляет смежные (Long
) числа. Это нужно сначала подсчитать элементы в каждом разделе, поэтому ваш ввод будет оценен дважды. Кэш вашего входного RDD, если вы хотите использовать это.
-
RDD.zipWithUniqueId
также дает вам уникальные идентификаторы Long
, но они не гарантированно смежны. (Они будут только смежными, если каждый раздел имеет одинаковое количество элементов.) Потенциал заключается в том, что это не нужно ничего знать о вводе, поэтому он не будет вызывать двойную оценку.
Ответ 2
Для аналогичного примера использования, я просто хэшировал строковые значения. См. http://blog.cloudera.com/blog/2014/03/why-apache-spark-is-a-crossover-hit-for-data-scientists/
def nnHash(tag: String) = tag.hashCode & 0x7FFFFF
var tagHashes = postIDTags.map(_._2).distinct.map(tag =>(nnHash(tag),tag))
Похоже, вы уже делаете что-то подобное, хотя хэширование может быть проще в управлении.
Matei предложил здесь подход к эмуляции zipWithIndex
на RDD, что сводится к назначению идентификаторов внутри каждой части, которые будут глобально уникальными: https://groups.google.com/forum/#!topic/spark-users/WxXvcn2gl1E
Ответ 3
Другим простым вариантом, если использовать DataFrames и просто обеспокоен уникальностью, является использование функции MonotonicallyIncreasingID
import org.apache.spark.sql.functions.monotonicallyIncreasingId
val newDf = df.withColumn("uniqueIdColumn", monotonicallyIncreasingId)
Изменить: MonotonicallyIncreasingID
устарел и удален начиная с Spark 2.0; он теперь известен как monotonically_increasing_id
.
Ответ 4
monotonically_increasing_id() представляется ответом, но, к сожалению, он не будет работать для ALS, поскольку он производит 64-битные номера, и ALS ожидает 32 (см. мой комментарий ниже radek1st ответ для deets).
Решение, которое я нашел, заключается в использовании zipWithIndex(), как указано в ответе Дарабоса. Вот как это реализовать:
Если у вас уже есть один столбцовый DataFrame с вашими явными пользователями с именем userids
, вы можете создать таблицу поиска (LUT) следующим образом:
# PySpark code
user_als_id_LUT = sqlContext.createDataFrame(userids.rdd.map(lambda x: x[0]).zipWithIndex(), StructType([StructField("userid", StringType(), True),StructField("user_als_id", IntegerType(), True)]))
Теперь вы можете:
- Используйте этот LUT для получения ALS-дружественных идентификаторов целого числа, чтобы предоставить ALS
- Используйте этот LUT для обратного поиска, когда вам нужно вернуться с идентификатора ALS к исходному идентификатору
Сделайте то же самое для элементов.
Ответ 5
Люди уже рекомендовали monotonically_increasing_id() и упомянули проблему, что он создает Longs, а не Ints.
Однако, по моему опыту (caveat - Spark 1.6) - если вы используете его на одном исполнителе (перераспределение до 1 раньше), нет префикса исполнителя, и его можно безопасно перевести в Int. Очевидно, что вам нужно иметь меньше строк Integer.MAX_VALUE.