Ответ 1
Проблема, с которой вы столкнулись, может быть разделена на следующие:
- Преобразование ваших рейтингов (я считаю) в
LabeledPoint
данные X. - Сохранение X в формате libsvm.
1. Преобразование ваших рейтингов в LabeledPoint
данные X
Давайте рассмотрим следующие необработанные рейтинги:
val rawRatings: Seq[String] = Seq("0,1,1.0", "0,3,3.0", "1,1,1.0", "1,2,0.0", "1,3,3.0", "3,3,4.0", "10,3,4.5")
Вы можете обрабатывать эти необработанные оценки как матрицу списка координат (COO).
Spark реализует распределенную матрицу, подкрепленную СДР своих записей: CoordinateMatrix
, где каждая запись является кортежем (i: Long, j: Long, value: Double).
Примечание. CoordinateMatrix следует использовать только в том случае, если оба размера матрицы огромны, а матрица очень разрежена. (как правило, это касается оценок пользователей/элементов).
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
import org.apache.spark.rdd.RDD
val data: RDD[MatrixEntry] =
sc.parallelize(rawRatings).map {
line => {
val fields = line.split(",")
val i = fields(0).toLong
val j = fields(1).toLong
val value = fields(2).toDouble
MatrixEntry(i, j, value)
}
}
Теперь давайте конвертируем этот RDD[MatrixEntry]
в CoordinateMatrix
и извлекаем проиндексированные строки:
val df = new CoordinateMatrix(data) // Convert the RDD to a CoordinateMatrix
.toIndexedRowMatrix().rows // Extract indexed rows
.toDF("label", "features") // Convert rows
2. Сохранение данных LabeledPoint в формате libsvm
Начиная с Spark 2.0, Вы можете сделать это, используя DataFrameWriter
. Давайте создадим небольшой пример с некоторыми фиктивными данными LabeledPoint (вы также можете использовать созданный ранее DataFrame
):
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
val df = Seq(neg,pos).toDF("label","features")
К сожалению, мы все еще не можем использовать DataFrameWriter
напрямую, потому что, хотя большинство компонентов конвейера поддерживают обратную совместимость для загрузки, некоторые существующие DataFrames и конвейеры в версиях Spark до 2.0, которые содержат векторные или матричные столбцы, могут нуждаться в миграции к новым векторным и матричным типам spark.ml.
Утилиты для преобразования столбцов DataFrame из типов mllib.linalg
в ml.linalg
(и наоборот) можно найти в org.apache.spark.mllib.util.MLUtils.
. В нашем случае нам нужно сделать следующее (как для фиктивных данных, так и для DataFrame
из step 1.
)
import org.apache.spark.mllib.util.MLUtils
// convert DataFrame columns
val convertedVecDF = MLUtils.convertVectorColumnsToML(df)
Теперь давайте сохраним DataFrame:
convertedVecDF.write.format("libsvm").save("data/foo")
И мы можем проверить содержимое файлов:
$ cat data/foo/part*
0.0 1:1.0 3:3.0
1.0 1:1.0 2:0.0 3:3.0
EDIT:
В текущей версии spark (2.1.0) нет необходимости использовать пакет mllib
. Вы можете просто сохранить данные LabeledPoint
в формате libsvm, как показано ниже:
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.feature.LabeledPoint
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
val df = Seq(neg,pos).toDF("label","features")
df.write.format("libsvm").save("data/foo")