我正在编写一个Spark应用程序,用于查找在时间范围内访问次数最多的n个URL。但是此作业会继续运行,并且需要数小时才能在 ES 中为实例389451
记录。我想减少这个时间。
我正在从 弹性搜索 火花中读到波纹管
val df = sparkSession.read
.format("org.elasticsearch.spark.sql")
.load(date + "/" + business)
.withColumn("ts_str", date_format($"ts", "yyyy-MM-dd HH:mm:ss")).drop("ts").withColumnRenamed("ts_str", "ts")
.select(selects.head, selects.tail:_*)
.filter($"ts" === ts)
.withColumn("url", split($"uri", "\?")(0)).drop("uri").withColumnRenamed("url", "uri").cache()
在上面的DF中,我正在从ElasticSearch读取和过滤。此外,我正在从 URI 中删除查询参数。
然后我正在做分组
var finalDF = df.groupBy("col1","col2","col3","col4","col5","uri").agg(sum("total_bytes").alias("total_bytes"), sum("total_req").alias("total_req"))
然后我正在运行一个窗口函数
val partitionBy = Seq("col1","col2","col3","col4","col5")
val window = Window.partitionBy(partitionBy.head, partitionBy.tail:_*).orderBy(desc("total_req"))
finalDF = finalDF.withColumn("rank", rank.over(window)).where($"rank" <= 5).drop("rank")
然后我正在写最终DF给卡桑德拉
finalDF.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "table", "keyspace" -> "keyspace")).mode(SaveMode.Append).save()
我在 ES 群集中有 4 个数据节点,我的 Spark 机器是 16 核 64GB RAM VM。请帮我找到问题所在。
读取后保留数据帧可能是个好主意,因为您将在排名函数中使用很多次。