我是使用Spark和scala的新手,但我必须解决以下问题:我有一个包含行的 ORC 文件,我必须根据来自哈希映射的特定条件进行检查。
我以这种方式构建了包含 120,000 个条目的哈希映射(文件名,时间戳((getTimestamp 返回Option[Long]
类型(:
val tgzFilesRDD = sc.textFile("...")
val fileNameTimestampRDD = tgzFilesRDD.map(itr => {
(itr, getTimestamp(itr))
})
val fileNameTimestamp = fileNameTimestampRDD.collect.toMap
并检索包含 600 万个条目的 RDD,如下所示:
val sessionDataDF = sqlContext.read.orc("...")
case class SessionEvent(archiveName: String, eventTimestamp: Long)
val sessionEventsRDD = sessionDataDF.as[SessionEvent].rdd
并进行检查:
val sessionEventsToReport = sessionEventsRDD.filter(se => {
val timestampFromFile = fileNameTimestamp.getOrElse(se.archiveName, None)
se.eventTimestamp < timestampFromFile.getOrElse[Long](Long.MaxValue)
})
这是正确且高性能的方法吗?是否建议使用缓存?地图fileNameTimestamp
是否会被洗牌到处理分区的集群?
fileNameTimestamp 将针对每个任务进行序列化,并且有 120,000 个条目,它可能非常昂贵。 应广播大型对象并引用广播变量:
val fileNameTimestampBC = sc.broadcast(fileNameTimestampRDD.collect.toMap)
现在,这些对象中只有一个将运送给每个工作人员。 也不需要下拉到 RDD API,因为数据集 API 有一个过滤方法:
val sessionEvents = sessionDataDF.as[SessionEvent]
val sessionEventsToReport = sessionEvents.filter(se => {
val timestampFromFile = fileNameTimestampBC.value.getOrElse(se.archiveName, None)
se.eventTimestamp < timestampFromFile.getOrElse[Long](Long.MaxValue)
})
您collect
编辑的fileNameTimestamp
映射存在于Spark Master Node上。为了在查询中像这样有效地引用,工作节点需要有权访问它。这是通过广播完成的。
实质上,您重新发现了广播哈希连接:您将使用 tgzFilesRDD 加入 sessionEventsRDD 以获得对可选时间戳的访问权限,然后相应地进行过滤。
使用 RDD 时,需要显式编码加入策略。数据帧/数据集 API 有一个查询优化器,可以为你做出选择。您还可以显式要求 API 在后台使用上述广播加入技术。您可以在此处找到这两种方法的示例。
让我知道这是否足够清楚:)