让 Q
是Spark中的分布式行矩阵,我想用其Transpose Q
的跨产品 Q'
。
但是,尽管一个行矩阵确实具有 multiply()
方法,但它只能接受本地矩阵作为参数。
代码插图(Scala):
val phi = new RowMatrix(phiRDD) // phiRDD is an instance of RDD[Vector]
val phiTranspose = transposeRowMatrix(phi) // transposeRowMatrix()
// returns the transpose of a RowMatrix
val crossMat = ? // phi * phiTranspose
请注意,我想执行 2 分布式rowmatrix 不是与本地的dot产品。
一种解决方案是使用IndexedRowMatrix
如下:
val phi = new IndexedRowMatrix(phiRDD) // phiRDD is an instance of RDD[IndexedRow]
val phiTranspose = transposeMatrix(phi) // transposeMatrix()
// returns the transpose of a Matrix
val crossMat = phi.toBlockMatrix().multiply( phiTranspose.toBlockMatrix()
).toIndexedRowMatrix()
但是,我想使用诸如 tallSkinnyQR()
之类的行矩阵 - 方法,这意味着我使用.toRowMatrix()
方法:
crossMat
转换为RowMatrix val crossRowMat = crossMat.toRowMatrix()
最后我可以应用
crossRowMat.tallSkinnyQR()
但是,此过程包括分布式矩阵的类型之间的许多转换,根据我从MLLIB编程指南中理解的内容,这很昂贵:
选择合适的格式来存储大型和分布式矩阵。将分布式矩阵转换为其他格式可能需要全局洗牌,这很昂贵。
请有人详细说明。
仅支持矩阵乘法的分布式矩阵是 BlockMatrices
。您必须相应地转换数据 - 人工指数足够好:
new IndexedRowMatrix(
rowMatrix.rows.zipWithIndex.map(x => IndexedRow(x._2, x._1))
).toBlockMatrix match { case m => m.multiply(m.transpose) }
我使用了此页面上列出的算法,该算法将乘法问题从点产品移动到分布式标量产品问题,通过使用向量外部产品:
两个向量之间的外产物是标量产品 第二个向量具有第一个向量中的所有元素,从而导致 矩阵
我自己创建的乘法函数(可以更优化),最终是这样的。
def multiplyRowMatrices(m1: RowMatrix, m2: RowMatrix)(implicit ctx: SparkSession): RowMatrix = {
// Zip m1 columns with m2 rows
val m1Cm2R = transposeRowMatrix(m1).rows.zip(m2.rows)
// Apply scalar product between each entry in m1 vector with m2 row
val scalar = m1Cm2R.map{
case(column:DenseVector,row:DenseVector) => column.toArray.map{
columnValue => row.toArray.map{
rowValue => columnValue*rowValue
}
}
}
// Add all the resulting matrices point wisely
val sum = scalar.reduce{
case(matrix1,matrix2) => matrix1.zip(matrix2).map{
case(array1,array2)=> array1.zip(array2).map{
case(value1,value2)=> value1+value2
}
}
}
new RowMatrix(ctx.sparkContext.parallelize(sum.map(array=> Vectors.dense(array))))
}
之后,我测试了这两种方法 - 我自己的功能并使用块矩阵 - 在一台计算机上使用300*10矩阵
使用我自己的功能:
val PhiMat = new RowMatrix(phi)
val TphiMat = transposeRowMatrix(PhiMat)
val product = multiplyRowMatrices(PhiMat,TphiMat)
使用矩阵变换:
val MatRow = new RowMatrix(phi)
val MatBlock = new IndexedRowMatrix(MatRow.rows.zipWithIndex.map(x => IndexedRow(x._2, x._1))).toBlockMatrix()
val TMatBlock = MatBlock.transpose
val productMatBlock = MatBlock.multiply(TMatBlock)
val productMatRow = productMatBlock.toIndexedRowMatrix().toRowMatrix()
第一种方法 1作业带有 5个阶段,并采用 2S> 2S 总共完成。第二种方法跨越 4个作业,三个带有一个阶段和的三个带有两个阶段的方法 ,然后服用 0.323S 总共。同样,第二种方法优于第一个方法,相对于洗牌读/写大小。
但是,我仍然对MLLIB编程指南的声明感到困惑:
选择合适的格式以存放大型和 分布式矩阵。将分布式矩阵转换为不同 格式可能需要全球洗牌,这很昂贵。