Create RDDs from main RDD



i有一个rdd(rdd [(string,itoserable [event])],该密钥代表一年中一个月,值是在此期间发生的数百万事件一个月。

我想循环浏览每个密钥并创建密钥事件的RDD。然后,我想为本月事件的每一天创建一个事件RDD,以便我可以将它们发送到相关的S3位置("目录"结构是bucketName/年/月/日)。

问题是,似乎您无法在另一个RDD的范围内创建RDD。因此,我不确定如何在不必将整个主RDD加载到内存的情况下实现我想要的东西(当然会吹出驾驶员的记忆并首先击败使用Spark的点)。

也许有一种方法可以实现我想要使用Spark的目标,我只是不知道它知道并且希望这里有人可以帮助您。

这是我目前拥有的代码:

 private def store(
    eventsByMonth: RDD[(String, Iterable[Event])]
  )(
    implicit sqlContext: SQLContext
  ): Try[Unit] =
    Try(
      eventsByMonth
        .foreach {
          case (_, events: Iterable[Event]) =>
            writeToS3Files(sqlContext.sparkContext.parallelize(events.toSeq))
        }
    )
  private def writeToS3Files(events: RDD[Event])(
    implicit sqlContext: SQLContext
  ): Try[Unit] =
    Try(
      // outputFilePath will contain the day that these events are related to.
      events.groupBy(_.outputFilePath).foreach {
        case (filePath: String, eventsForFile: Iterable[Event]) =>
          writeToS3File(filePath, sqlContext.sparkContext.parallelize(eventsForFile.toSeq))
      }
    )
  private def writeToS3File(filePath: String, events: RDD[Event]): Try[Unit] = {
    val fileNameWithPath = s"${filePath}${UUID.randomUUID().toString}.gz"
    Try(events.saveAsTextFile(fileNameWithPath, classOf[GzipCodec]))
  }

我假设有某种方法可以确定一个月发生事件发生的一天(例如,(INT类型)是事件的属性)。

您可以将rdd [(string,itoserable [event]]转换为Pairdd [(k,v)],其中键(k)是事件发生的月份和每月和值(v)的月份和一天所有事件都发生在一个月的那一天。之后,您可以轻松地将数据转储到数据库中。

val eventsByMonthAndDate = eventsByMonth.flatMap { case (month, events) => events.map(e => ((month, e.day), e)) }
eventsByMonthAndDate.groupby(_._1).foreach(writeToDB)

最新更新