预排序输入上的火花特征向量变换



我在HDFS上的制表符分隔文件中有一些数据,如下所示:

label | user_id | feature
------------------------------
  pos | 111     | www.abc.com
  pos | 111     | www.xyz.com
  pos | 111     | Firefox
  pos | 222     | www.example.com
  pos | 222     | www.xyz.com
  pos | 222     | IE
  neg | 333     | www.jkl.com
  neg | 333     | www.xyz.com
  neg | 333     | Chrome

我需要转换它以为每个user_id创建一个特征向量来训练org.apache.spark.ml.classification.NaiveBayes模型。

我目前的方法基本上如下:

  1. 将原始数据加载到数据帧中
  2. 使用 StringIndexer 为要素编制索引
  3. 转到RDD和分组user_id并将特征索引映射到稀疏向量中。

踢球者是这个...数据已按user_id预先排序。 利用这一点的最佳方式是什么? 想到可能会发生多少不必要的工作,我感到很痛苦。

如果一些代码有助于理解我当前的方法,以下是地图的本质:

val featurization = (vals: (String,Iterable[Row])) => {
  // create a Seq of all the feature indices
  // Note: the indexing was done in a previous step not shown
  val seq = vals._2.map(x => (x.getDouble(1).toInt,1.0D)).toSeq
  // create the sparse vector
  val featureVector = Vectors.sparse(maxIndex, seq)
  // convert the string label into a Double
  val label = if (vals._2.head.getString(2) == "pos") 1.0 else 0.0
  (label, vals._1, featureVector)
}
d.rdd
  .groupBy(_.getString(1))
  .map(featurization)
  .toDF("label","user_id","features")

让我们从你的另一个问题开始

如果我在磁盘上的数据保证按将用于组聚合或减少的键进行预排序,Spark 有什么办法可以利用这一点吗?

这要看情况。如果您应用的操作可以从映射端聚合中受益,那么通过预排序数据而无需进一步干预代码,您可以获得很多收益。共享相同键的数据应位于相同的分区上,并且可以在随机播放之前在本地聚合。

不幸的是,在这种特定情况下,它不会有太大帮助。即使您启用了地图端聚合(groupBy(Key)不使用是,因此您需要自定义实现)或聚合特征向量(您会在我的回答中找到一些示例 如何定义自定义聚合函数来对一列向量求和?)也没有太多收获。您可以在这里和那里节省一些工作,但您仍然必须在节点之间传输所有索引。

如果你想获得更多,你必须做更多的工作。我可以看到您可以利用现有订单的两种基本方法:

  1. 使用自定义 Hadoop 输入格式仅生成完整的记录(标签、id、所有特征),而不是逐行读取数据。如果数据的每个 id 具有固定的行数,您甚至可以尝试使用NLineInputFormat并在之后将mapPartitions应用于聚合记录。

    这绝对是更详细的解决方案,但不需要在 Spark 中进行额外的洗牌。

  2. 像往常一样读取数据,但使用自定义分区程序进行groupBy。据我所知,使用rangePartitioner应该可以正常工作,但为了确保您可以尝试以下过程:

    • 使用 mapPartitionsWithIndex 查找每个分区的最小/最大 ID。
    • 创建分区程序,该分区在当前(第 i<)分区上保持最小 <= ids 最大值,并将最大值推送到分区 i + 1
    • 使用此分区程序进行groupBy(Key)

    这可能是更友好的解决方案,但至少需要一些洗牌。如果要移动的预期记录数很少(每个分区 <<#records 个),您甚至可以使用 mapPartitionsbroadcast * 在不洗牌的情况下处理这个问题,尽管在实践中进行分区可能更有用且更便宜。


* 您可以使用类似于这样的方法:https://stackoverflow.com/a/33072089/1560062

最新更新