结构化流式处理性能和清除镶木地板文件



我正在使用Spark结构化流从Kafka获取流数据。我需要聚合各种指标(比如 6 个指标(并编写为镶木地板文件。我确实看到指标 1 和指标 2 之间存在巨大的延迟。例如,如果指标 1 最近更新,则衡量指标 2 是一个小时前的数据。如何提高此性能以并行工作?

另外,我编写了应该由另一个应用程序读取的镶木地板文件。如何不断清除旧的镶木地板信息?我应该有不同的应用程序吗?

Dataset<String> lines_topic = spark.readStream().format("kafka").option("kafka.bootstrap.servers", bootstrapServers) 
Dataset<Row> data= lines_topic.select(functions.from_json(lines_topic.col("value"), schema).alias(topics)); data.withWatermark(---).groupBy(----).count(); query = data.writeStream().format("parquet").option("path",---).option("truncate", "false").outputMode("append").option("checkpointLocation", checkpointFile).start();

由于每个查询都独立于其他查询运行,因此您需要确保为每个查询提供足够的资源来执行。可能发生的情况是,如果您使用的是默认的FIFO调度程序,则所有触发器都将按顺序运行,而不是并行运行。

正如此处所述,您应该在 SparkContext 上设置一个 FAIR 调度程序,然后为每个查询定义新的池。

// Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").format("parquet").start(path1)
// Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").format("orc").start(path2)

此外,在清除旧的镶木地板文件方面,您可能希望对数据进行分区,然后根据需要定期删除旧分区。否则,如果所有数据都写入同一输出路径,则不能只删除行。

最新更新