如何在apache spark中使用scala或python运行多线程作业



我正面临着一个与spark并发相关的问题,这阻止了我在生产中使用它,但我知道有一条出路。我正在尝试在700万用户上运行Spark ALS,使用订单历史记录查询10亿产品。首先,我取了一个不同用户的列表,然后在这些用户上运行一个循环来获得推荐,这是一个相当缓慢的过程,需要几天的时间才能获得所有用户的推荐。我尝试做笛卡尔用户和产品,一次得到所有的推荐,但再次提供给elasticsearch,我必须过滤和排序每个用户的记录,只有这样我才能提供给elasticsearch,供其他api使用。

所以请给我一个解决方案,这个解决方案在这种用例中是相当可扩展的,并且可以在生产中使用并提供实时建议。

下面是我的scala代码片段,它会让你知道我目前是如何解决这个问题的:

  //    buy_values -> RDD with Rating(<int user_id>, <int product_id>, <double rating>)
  def recommend_for_user(user: Int): Unit = {
      println("Recommendations for User ID: " + user);
      // Product IDs which are not bought by user 
      val candidates = buys_values
        .filter(x => x("customer_id").toString.toInt != user)
        .map(x => x("product_id").toString.toInt)
        .distinct().map((user, _))
      // find 30 products with top rating
      val recommendations = bestModel.get
        .predict(candidates)
        .takeOrdered(30)(Ordering[Double].reverse.on(x => x.rating))
      var i = 1
      var ESMap = Map[String, String]()
      recommendations.foreach { r =>
        ESMap += r.product.toString -> bitem_ids.value(r.product)
      }
      //  push to elasticsearch with user as id
      client.execute {
        index into "recommendation" / "items" id user fields ESMap
      }.await
      // remove candidate RDD from memory
      candidates.unpersist()
  }
  // iterate on each user to get recommendations for the user [slow process]
  user_ids.foreach(recommend_for_user)

很明显,程序中的瓶颈是对candidates的搜索。考虑到Spark架构,它严重限制了并行化的能力,并且为每个用户启动Spark作业增加了大量的开销。

假设典型的场景,有700万用户十亿产品,大多数情况下你可以预测整个产品范围减去用户已经购买的产品。至少在我看来,重要的问题是为什么要过滤。即使你推荐以前买过的产品,它真的有害吗?

除非您有非常严格的要求,否则我会简单地忽略这个问题并使用MatrixFactorizationModel.recommendProductsForUsers,它几乎可以为您完成所有工作,不包括数据导出。之后,你可以执行批量导出,你就可以开始了。

现在让我们假设您有一个明确的无重复策略。假设一个典型的用户只购买了相对少量的产品,您可以从为每个用户获取一组产品开始:

val userProdSet = buy_values
    .map{case (user, product, _) => (user, product)} 
    .aggregateByKey(Set.empty[Int])((s, e) => s + e, (s1, s2) => s1 ++ s2)

接下来你可以简单地映射userProdSet来得到预测:

// Number of predictions for each user
val nPred = 30;
userProdSet.map{case (user, prodSet) => {
    val recommended = model
         // Find recommendations for user
        .recommendProducts(_, nPred + prodSet.size))
        // Filter to remove already purchased 
        .filter(rating => !prodSet.contains(rating.product))
        // Sort and limit
        .sortBy(_.rating)
        .reverse
        .take(nPred)
    (user, recommended)
}}

您可以通过使用可变集合进行聚合和广播模型来进一步改进,但这是一般的想法。

如果user_ids中的用户数量低于整个集合(buy_values)中的用户数量,您可以简单地过滤userProdSet以仅保留用户的子集。

1.4具有用于生成所有推荐的建议,以便可以通过kv商店提供。

最新更新