如何计算Apache Spark中两个分布式Rowmatrix的点产品



Q 是Spark中的分布式行矩阵,我想用其Transpose 计算 Q 的跨产品 Q'

但是,尽管一个行矩阵确实具有 multiply() 方法,但它只能接受本地矩阵作为参数。

代码插图(Scala):

val phi = new RowMatrix(phiRDD)            // phiRDD is an instance of RDD[Vector]
val phiTranspose = transposeRowMatrix(phi) // transposeRowMatrix()
                                           // returns the transpose of a RowMatrix
val crossMat = ?                           // phi * phiTranspose

请注意,我想执行 2 分布式rowmatrix 不是与本地的dot产品。

一种解决方案是使用IndexedRowMatrix如下:

val phi = new IndexedRowMatrix(phiRDD)  // phiRDD is an instance of RDD[IndexedRow]
val phiTranspose = transposeMatrix(phi) // transposeMatrix()
                                        // returns the transpose of a Matrix
val crossMat = phi.toBlockMatrix().multiply( phiTranspose.toBlockMatrix()
                                             ).toIndexedRowMatrix()

但是,我想使用诸如 tallSkinnyQR() 之类的行矩阵 - 方法,这意味着我使用.toRowMatrix()方法:

,我将crossMat转换为RowMatrix
val crossRowMat = crossMat.toRowMatrix()

最后我可以应用

crossRowMat.tallSkinnyQR()

但是,此过程包括分布式矩阵的类型之间的许多转换,根据我从MLLIB编程指南中理解的内容,这很昂贵:

选择合适的格式来存储大型和分布式矩阵。将分布式矩阵转换为其他格式可能需要全局洗牌,这很昂贵。

请有人详细说明。

仅支持矩阵乘法的分布式矩阵是 BlockMatrices。您必须相应地转换数据 - 人工指数足够好:

new IndexedRowMatrix(
  rowMatrix.rows.zipWithIndex.map(x => IndexedRow(x._2,  x._1))
).toBlockMatrix match { case m => m.multiply(m.transpose) }

我使用了此页面上列出的算法,该算法将乘法问题从点产品移动到分布式标量产品问题,通过使用向量外部产品:

两个向量之间的外产物是标量产品 第二个向量具有第一个向量中的所有元素,从而导致 矩阵

我自己创建的乘法函数(可以更优化),最终是这样的。

def multiplyRowMatrices(m1: RowMatrix, m2: RowMatrix)(implicit ctx: SparkSession): RowMatrix = {
 // Zip m1 columns with m2 rows
val m1Cm2R = transposeRowMatrix(m1).rows.zip(m2.rows)
// Apply scalar product between each entry in m1 vector with m2 row
val scalar = m1Cm2R.map{
case(column:DenseVector,row:DenseVector) => column.toArray.map{
  columnValue => row.toArray.map{
    rowValue => columnValue*rowValue
  }
 }
}
// Add all the resulting matrices point wisely
val sum = scalar.reduce{
case(matrix1,matrix2) => matrix1.zip(matrix2).map{
  case(array1,array2)=> array1.zip(array2).map{
    case(value1,value2)=> value1+value2
  }
 }
}
new RowMatrix(ctx.sparkContext.parallelize(sum.map(array=> Vectors.dense(array))))
}

之后,我测试了这两种方法 - 我自己的功能并使用块矩阵 - 在一台计算机上使用300*10矩阵

使用我自己的功能:

val PhiMat = new RowMatrix(phi)
val TphiMat = transposeRowMatrix(PhiMat)
val product = multiplyRowMatrices(PhiMat,TphiMat)

使用矩阵变换:

val MatRow = new RowMatrix(phi)
val MatBlock = new IndexedRowMatrix(MatRow.rows.zipWithIndex.map(x => IndexedRow(x._2,  x._1))).toBlockMatrix()
val TMatBlock = MatBlock.transpose
val productMatBlock = MatBlock.multiply(TMatBlock)
val productMatRow = productMatBlock.toIndexedRowMatrix().toRowMatrix()

第一种方法 1作业带有 5个阶段,并采用 2S> 2S 总共完成。第二种方法跨越 4个作业三个带有一个阶段的三个带有两个阶段的方法 ,然后服用 0.323S 总共。同样,第二种方法优于第一个方法,相对于洗牌读/写大小。

但是,我仍然对MLLIB编程指南的声明感到困惑:

选择合适的格式以存放大型和 分布式矩阵。将分布式矩阵转换为不同 格式可能需要全球洗牌,这很昂贵。

相关内容

  • 没有找到相关文章

最新更新