如何根据哈希映射过滤RDD



我是使用Spark和scala的新手,但我必须解决以下问题:我有一个包含行的 ORC 文件,我必须根据来自哈希映射的特定条件进行检查。

我以这种方式构建了包含 120,000 个条目的哈希映射(文件名,时间戳((getTimestamp 返回Option[Long]类型(:

val tgzFilesRDD = sc.textFile("...")
val fileNameTimestampRDD = tgzFilesRDD.map(itr => {
    (itr, getTimestamp(itr))
})
val fileNameTimestamp = fileNameTimestampRDD.collect.toMap

并检索包含 600 万个条目的 RDD,如下所示:

val sessionDataDF = sqlContext.read.orc("...")
case class SessionEvent(archiveName: String, eventTimestamp: Long)
val sessionEventsRDD = sessionDataDF.as[SessionEvent].rdd

并进行检查:

val sessionEventsToReport = sessionEventsRDD.filter(se => {
    val timestampFromFile = fileNameTimestamp.getOrElse(se.archiveName, None)
    se.eventTimestamp < timestampFromFile.getOrElse[Long](Long.MaxValue)
})

这是正确且高性能的方法吗?是否建议使用缓存?地图fileNameTimestamp是否会被洗牌到处理分区的集群?

fileNameTimestamp 将针对每个任务进行序列化,并且有 120,000 个条目,它可能非常昂贵。 应广播大型对象并引用广播变量:

val fileNameTimestampBC = sc.broadcast(fileNameTimestampRDD.collect.toMap)

现在,这些对象中只有一个将运送给每个工作人员。 也不需要下拉到 RDD API,因为数据集 API 有一个过滤方法:

val sessionEvents = sessionDataDF.as[SessionEvent]
val sessionEventsToReport = sessionEvents.filter(se => {
    val timestampFromFile = fileNameTimestampBC.value.getOrElse(se.archiveName, None)
    se.eventTimestamp < timestampFromFile.getOrElse[Long](Long.MaxValue)
})

collect编辑的fileNameTimestamp映射存在于Spark Master Node上。为了在查询中像这样有效地引用,工作节点需要有权访问它。这是通过广播完成的。

实质上,您重新发现了广播哈希连接:您将使用 tgzFilesRDD 加入 sessionEventsRDD 以获得对可选时间戳的访问权限,然后相应地进行过滤。

使用 RDD 时,需要显式编码加入策略。数据帧/数据集 API 有一个查询优化器,可以为你做出选择。您还可以显式要求 API 在后台使用上述广播加入技术。您可以在此处找到这两种方法的示例。

让我知道这是否足够清楚:)

相关内容

  • 没有找到相关文章

最新更新