Матрица Transpose на RowMatrix в Spark
Предположим, что у меня есть RowMatrix.
- Как это сделать. Документация API, похоже, не имеет метода транспонирования.
- Матрица имеет метод transpose(). Но он не распространяется. Если у меня есть большая матрица, более высокая, чем память, как ее можно транспонировать?
-
Я преобразовал RowMatrix в DenseMatrix следующим образом
DenseMatrix Mat = new DenseMatrix(m,n,MatArr);
который требует преобразования RowMatrix в JavaRDD и преобразования JavaRDD в массив.
Есть ли другой удобный способ сделать преобразование?
Заранее спасибо
Ответы
Ответ 1
Вы правы: нет
RowMatrix.transpose()
метод. Вам нужно будет выполнить эту операцию вручную.
Ниже представлены версии нераспределенных/локальных:
def transpose(m: Array[Array[Double]]): Array[Array[Double]] = {
(for {
c <- m(0).indices
} yield m.map(_(c)) ).toArray
}
распределенная версия будет состоять из следующих строк:
origMatRdd.rows.zipWithIndex.map{ case (rvect, i) =>
rvect.zipWithIndex.map{ case (ax, j) => ((j,(i,ax))
}.groupByKey
.sortBy{ case (i, ax) => i }
.foldByKey(new DenseVector(origMatRdd.numRows())) { case (dv, (ix,ax)) =>
dv(ix) = ax
}
Предостережение: я не тестировал выше: у него будут ошибки. Но основной подход действителен - и аналогичен той работе, которую я делал в прошлом для небольшой библиотеки LinAlg для искры.
Ответ 2
Если кто-то заинтересован, я внедрил распространенную версию, предложенную @javadba.
def transposeRowMatrix(m: RowMatrix): RowMatrix = {
val transposedRowsRDD = m.rows.zipWithIndex.map{case (row, rowIndex) => rowToTransposedTriplet(row, rowIndex)}
.flatMap(x => x) // now we have triplets (newRowIndex, (newColIndex, value))
.groupByKey
.sortByKey().map(_._2) // sort rows and remove row indexes
.map(buildRow) // restore order of elements in each row and remove column indexes
new RowMatrix(transposedRowsRDD)
}
def rowToTransposedTriplet(row: Vector, rowIndex: Long): Array[(Long, (Long, Double))] = {
val indexedRow = row.toArray.zipWithIndex
indexedRow.map{case (value, colIndex) => (colIndex.toLong, (rowIndex, value))}
}
def buildRow(rowWithIndexes: Iterable[(Long, Double)]): Vector = {
val resArr = new Array[Double](rowWithIndexes.size)
rowWithIndexes.foreach{case (index, value) =>
resArr(index.toInt) = value
}
Vectors.dense(resArr)
}
Ответ 3
Вы можете использовать BlockMatrix, который может быть создан из IndexedRowMatrix:
BlockMatrix matA = (new IndexedRowMatrix(...).toBlockMatrix().cache();
matA.validate();
BlockMatrix matB = matA.transpose();
Затем можно легко вернуть в качестве IndexedRowMatrix. Это описано в искровой документации.
Ответ 4
Для очень большой и разреженной матрицы (например, тот, который вы получаете от извлечения текстовых функций) лучший и самый простой способ:
def transposeRowMatrix(m: RowMatrix): RowMatrix = {
val indexedRM = new IndexedRowMatrix(m.rows.zipWithIndex.map({
case (row, idx) => new IndexedRow(idx, row)}))
val transposed = indexedRM.toCoordinateMatrix().transpose.toIndexedRowMatrix()
new RowMatrix(transposed.rows
.map(idxRow => (idxRow.index, idxRow.vector))
.sortByKey().map(_._2))
}
Для не столь разреженной матрицы вы можете использовать BlockMatrix в качестве моста, как указано выше в ответе aletapool.
Однако ответ aletapool пропускает очень важный момент: когда вы начинаете с RowMaxtrix → IndexedRowMatrix → BlockMatrix → transpose → BlockMatrix → IndexedRowMatrix → RowMatrix, на последнем шаге (IndexedRowMatrix → RowMatrix) вам нужно сделай вид. Поскольку по умолчанию преобразование из IndexedRowMatrix в RowMatrix, индекс просто отбрасывается и порядок будет испорчен.
val data = Array(
MllibVectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
MllibVectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
MllibVectors.dense(4.0, 0.0, 0.0, 6.0, 7.0),
MllibVectors.sparse(5, Seq((2, 2.0), (3, 7.0))))
val dataRDD = sc.parallelize(data, 4)
val testMat: RowMatrix = new RowMatrix(dataRDD)
testMat.rows.collect().map(_.toDense).foreach(println)
[0.0,1.0,0.0,7.0,0.0]
[2.0,0.0,3.0,4.0,5.0]
[4.0,0.0,0.0,6.0,7.0]
[0.0,0.0,2.0,7.0,0.0]
transposeRowMatrix(testMat).
rows.collect().map(_.toDense).foreach(println)
[0.0,2.0,4.0,0.0]
[1.0,0.0,0.0,0.0]
[0.0,3.0,0.0,2.0]
[7.0,4.0,6.0,7.0]
[0.0,5.0,7.0,0.0]
Ответ 5
Это вариант предыдущего решения, но он работает с разреженной матрицей строк и при необходимости сохраняет транспонированную разреженность:
def transpose(X: RowMatrix): RowMatrix = {
val m = X.numRows ().toInt
val n = X.numCols ().toInt
val transposed = X.rows.zipWithIndex.flatMap {
case (sp: SparseVector, i: Long) => sp.indices.zip (sp.values).map {case (j, value) => (i, j, value)}
case (dp: DenseVector, i: Long) => Range (0, n).toArray.zip (dp.values).map {case (j, value) => (i, j, value)}
}.sortBy (t => t._1).groupBy (t => t._2).map {case (i, g) =>
val (indices, values) = g.map {case (i, j, value) => (i.toInt, value)}.unzip
if (indices.size == m) {
(i, Vectors.dense (values.toArray) )
} else {
(i, Vectors.sparse (m, indices.toArray, values.toArray))
}
}.sortBy(t => t._1).map (t => t._2)
new RowMatrix (transposed)
}
Надеюсь на эту помощь!
Ответ 6
Получение транспонирования RowMatrix в Java:
public static RowMatrix transposeRM(JavaSparkContext jsc, RowMatrix mat){
List<Vector> newList=new ArrayList<Vector>();
List<Vector> vs = mat.rows().toJavaRDD().collect();
double [][] tmp=new double[(int)mat.numCols()][(int)mat.numRows()] ;
for(int i=0; i < vs.size(); i++){
double[] rr=vs.get(i).toArray();
for(int j=0; j < mat.numCols(); j++){
tmp[j][i]=rr[j];
}
}
for(int i=0; i < mat.numCols();i++)
newList.add(Vectors.dense(tmp[i]));
JavaRDD<Vector> rows2 = jsc.parallelize(newList);
RowMatrix newmat = new RowMatrix(rows2.rdd());
return (newmat);
}