Добавление двух RDD [mllib.linalg.Vector]
Мне нужно добавить две матрицы, которые хранятся в двух файлах.
Содержимое latest1.txt
и latest2.txt
имеет следующую строку:
1 2 3
4 5 6
7 8 9
Я читаю эти файлы следующим образом:
scala> val rows = sc.textFile("latest1.txt").map { line => val values = line.split(‘ ‘).map(_.toDouble)
Vectors.sparse(values.length,values.zipWithIndex.map(e => (e._2, e._1)).filter(_._2 != 0.0))
}
scala> val r1 = rows
r1: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MappedRDD[2] at map at :14
scala> val rows = sc.textFile("latest2.txt").map { line => val values = line.split(‘ ‘).map(_.toDouble)
Vectors.sparse(values.length,values.zipWithIndex.map(e => (e._2, e._1)).filter(_._2 != 0.0))
}
scala> val r2 = rows
r2: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MappedRDD[2] at map at :14
Я хочу добавить r1, r2. Итак, есть ли способ добавить эти два RDD[mllib.linalg.Vector]
в Apache-Spark.
Ответы
Ответ 1
Это действительно хороший вопрос. Я регулярно работаю с mllib и не понимаю, что эти основные операции с линейной алгеброй недоступны.
Дело в том, что базовые векторы breeze имеют все манипуляции с линейной алгеброй, которые вы ожидаете, в том числе, конечно, базовое добавление, которое вы конкретно упомянули.
Однако реализация бриза скрыта от внешнего мира через:
[private mllib]
Итак, с точки зрения внешнего мира/публичного API, как мы получаем доступ к этим примитивам?
Некоторые из них уже выставлены: например. сумма квадратов:
/**
* Returns the squared distance between two Vectors.
* @param v1 first Vector.
* @param v2 second Vector.
* @return squared distance between two Vectors.
*/
def sqdist(v1: Vector, v2: Vector): Double = {
...
}
Однако выбор таких доступных методов ограничен и фактически не включает в себя основные операции, включая элементное добавление, вычитание, умножение и т.д.
Итак, вот лучшее, что я мог видеть:
- Преобразовать векторы на ветер:
- Выполнение векторных операций на ветру
- Преобразование назад с бриза на mllib Vector
Вот пример кода:
val v1 = Vectors.dense(1.0, 2.0, 3.0)
val v2 = Vectors.dense(4.0, 5.0, 6.0)
val bv1 = new DenseVector(v1.toArray)
val bv2 = new DenseVector(v2.toArray)
val vectout = Vectors.dense((bv1 + bv2).toArray)
vectout: org.apache.spark.mllib.linalg.Vector = [5.0,7.0,9.0]
Ответ 2
В следующем коде представлены методы "какBreeze" и "Breeze" из Spark. Это решение поддерживает SparseVector
в отличие от использования vector.toArray
. Обратите внимание, что Spark может изменить свой API в будущем и уже переименовал toBreeze
в asBreeze
.
package org.apache.spark.mllib.linalg
import breeze.linalg.{Vector => BV}
import org.apache.spark.sql.functions.udf
/** expose vector.toBreeze and Vectors.fromBreeze
*/
object VectorUtils {
def fromBreeze(breezeVector: BV[Double]): Vector = {
Vectors.fromBreeze( breezeVector )
}
def asBreeze(vector: Vector): BV[Double] = {
// this is vector.asBreeze in Spark 2.0
vector.toBreeze
}
val addVectors = udf {
(v1: Vector, v2: Vector) => fromBreeze( asBreeze(v1) + asBreeze(v2) )
}
}
С помощью этого можно сделать df.withColumn("xy", addVectors($"x", $"y"))
.