正在读取spark结构流中的最新s3文件



我有一个spark结构的流式代码,它从s3 bucket中读取JSON文件并将其写回s3。输入文件路径格式:

val inputPath = s3://<path>/2022-08-26

输出文件路径格式:

val outputPath = s3://<path>/2022-08-26

代码:

val spark = SparkSession.builder().appName("raw_data").enableHiveSupport().getOrCreate()
val df = spark.readStream.option("startingPosition","earliest").schema(LogSchema).json(inputPath)

val query = df.
writeStream.
outputMode("append").
partitionBy("day").
format("parquet").
option("path", "s3://<path>/raw_data/data/").
option("checkpointLocation", "s3://<path>/raw_data/checkpoint/").
trigger(Trigger.ProcessingTime("300 seconds")).
start()

面临的问题:

  1. 我们想读取当天在s3 bucket分区上收到的最新文件(而不是旧文件(
  2. 在s3 bucket上写入文件应该在当天进行分区

请帮助我解决上述问题。

您可以在S3之上使用hive来获得这种行为,在S3中您可以按天/日期创建hive分区。您可以创建一个位置指向S3的配置单元表。然后,您可以基于hive表分区进行读写操作。在您的情况下,该分区将是最新的。

你可以参考这篇文章https://medium.com/analytics-vidhya/hive-using-s3-and-scala-af5524302758

最新更新