Spark 结构化流式处理监视器顶级文件夹,但动态写入单独的表



我有一个结构//storage-layer/raw/__SOME_FOLDERS__的 s3 存储桶。EG://storage-layer/raw/GTest//storage-layer/raw/HTest。在这些文件夹中,还可能有一些其他文件夹,例如raw/GTest/abcraw/HTest/xyz.abc文件夹不会重叠,xyz来自GTestHTest

我成功地设置了一个 Spark 结构化流来监视传入parquet文件的raw/GTest/abc,并将结果写入控制台。

def process_row(df, epoch_id):
df.show()

# Structured Streaming 
(
self.spark
.readStream
.format("parquet")
.option("maxFilesPerTrigger", 20)            
.option("inferSchema", "true")
.load("s3a://storage-layer/raw/GTest/abc/*")
.writeStream
.format("console")
.outputMode("append")
.trigger(processingTime="5 seconds")
# .foreachBatch(process_row)
.start()
.awaitTermination()
)

我的问题是,如何设置 1 个结构化流应用程序以从上层文件夹读取流:storage-layer/raw/*对其进行一些处理,并将其保存到 s3 中完全不同的文件夹/存储桶中?

我已经看了上面的foreachBatch,但我不确定如何设置它以实现最终结果。我收到错误消息Unable to infer schema for Parquet. It must be specified manually.

最终结果示例:

  • 将 Parquet 文件保存到 S3storage-layer/raw/GTest/abc->结构化流式传输 + 处理成storage-layer/processed/GTest/abc作为 Parquet 文件。

  • 保存到 S3storage-layer/raw/HTest/xyz的镶木地板文件 ->结构化流式传输 + 处理成storage-layer/processed/HTest/xyz作为镶木地板文件。

  • 对于Unable to infer the schema for Parquet. It must be specified manually.Spark 流无法自动推断架构,正如我们在静态读取中看到的那样。因此,需要以编程方式s3a://storage-layer/raw/*或存储在外部文件中为数据显式提供架构。看看这个。
  • 您有两个不同的源位置,因此需要两个readStream。如果storage-layer/raw/*中的数据具有相同的架构,并且您希望仅使用一个readStream来实现它,则在写入过程中包含一个额外的字段作为stream_source_path,并且在storage-layer/raw/*写入数据的进程应填充此字段。因此,现在您的流媒体应用程序知道正在从哪个源位置数据读取,您可以根据单个readStream的值派生stream_source_path两个数据帧。
  • 现在可以将上述两个数据帧写入单独的接收器。
  • Spark 对File接收器具有现成的支持,并且你想要以parquet格式写入数据。因此,您不需要foreachforeachbatch实施。

代码片段 -


val schemaObj = new Schema.Parser().parse(avsc_schema_file)
val schema = SchemaConverters.toSqlType(schemaObj).dataType.asInstanceOf[StructType]
val stream = sparkSession.readStream
.schema(schema)
.format("parquet")
.option("cleanSource","archive")
.option("maxFilesPerTrigger", "1")
.option("sourceArchiveDir","storage-layer/streaming_source_archive/")
.option("latestFirst", value = true)
.load("s3a://storage-layer/raw/*")
val df_abc = stream.filter(col("stream_source_path") === "storage-layer/raw/GTest/abc")
val df_xyz = stream.filter(col("stream_source_path") === "storage-layer/raw/GTest/xyz")
df_abc.writeStream
.format("parquet")        
.option("path", "storage-layer/processed/GTest/abc")
.option("checkpointLocation", "storage-layer/streaming_checkpoint/GTest/abc")
.trigger(Trigger.ProcessingTime("2 seconds"))
.start()
df_xyz.writeStream 
.format("parquet")        
.option("path", "storage-layer/processed/GTest/xyz")
.option("checkpointLocation", "storage-layer/streaming_checkpoint/GTest/xyz")
.trigger(Trigger.ProcessingTime("2 seconds"))
.start()
sparkSession.streams.active.foreach(x => x.awaitTermination())

相关内容

最新更新