我有一个结构//storage-layer/raw/__SOME_FOLDERS__
的 s3 存储桶。EG://storage-layer/raw/GTest
和//storage-layer/raw/HTest
。在这些文件夹中,还可能有一些其他文件夹,例如raw/GTest/abc
、raw/HTest/xyz
.abc
文件夹不会重叠,xyz
来自GTest
或HTest
。
我成功地设置了一个 Spark 结构化流来监视传入parquet
文件的raw/GTest/abc
,并将结果写入控制台。
def process_row(df, epoch_id):
df.show()
# Structured Streaming
(
self.spark
.readStream
.format("parquet")
.option("maxFilesPerTrigger", 20)
.option("inferSchema", "true")
.load("s3a://storage-layer/raw/GTest/abc/*")
.writeStream
.format("console")
.outputMode("append")
.trigger(processingTime="5 seconds")
# .foreachBatch(process_row)
.start()
.awaitTermination()
)
我的问题是,如何设置 1 个结构化流应用程序以从上层文件夹读取流:storage-layer/raw/*
对其进行一些处理,并将其保存到 s3 中完全不同的文件夹/存储桶中?
我已经看了上面的foreachBatch
,但我不确定如何设置它以实现最终结果。我收到错误消息Unable to infer schema for Parquet. It must be specified manually.
最终结果示例:
将 Parquet 文件保存到 S3
storage-layer/raw/GTest/abc
->结构化流式传输 + 处理成storage-layer/processed/GTest/abc
作为 Parquet 文件。保存到 S3
storage-layer/raw/HTest/xyz
的镶木地板文件 ->结构化流式传输 + 处理成storage-layer/processed/HTest/xyz
作为镶木地板文件。
- 对于
Unable to infer the schema for Parquet. It must be specified manually.
Spark 流无法自动推断架构,正如我们在静态读取中看到的那样。因此,需要以编程方式s3a://storage-layer/raw/*
或存储在外部文件中为数据显式提供架构。看看这个。 - 您有两个不同的源位置,因此需要两个
readStream
。如果storage-layer/raw/*
中的数据具有相同的架构,并且您希望仅使用一个readStream
来实现它,则在写入过程中包含一个额外的字段作为stream_source_path
,并且在storage-layer/raw/*
写入数据的进程应填充此字段。因此,现在您的流媒体应用程序知道正在从哪个源位置数据读取,您可以根据单个readStream
的值派生stream_source_path
两个数据帧。 - 现在可以将上述两个数据帧写入单独的接收器。
- Spark 对
File
接收器具有现成的支持,并且你想要以parquet
格式写入数据。因此,您不需要foreach
或foreachbatch
实施。
代码片段 -
val schemaObj = new Schema.Parser().parse(avsc_schema_file)
val schema = SchemaConverters.toSqlType(schemaObj).dataType.asInstanceOf[StructType]
val stream = sparkSession.readStream
.schema(schema)
.format("parquet")
.option("cleanSource","archive")
.option("maxFilesPerTrigger", "1")
.option("sourceArchiveDir","storage-layer/streaming_source_archive/")
.option("latestFirst", value = true)
.load("s3a://storage-layer/raw/*")
val df_abc = stream.filter(col("stream_source_path") === "storage-layer/raw/GTest/abc")
val df_xyz = stream.filter(col("stream_source_path") === "storage-layer/raw/GTest/xyz")
df_abc.writeStream
.format("parquet")
.option("path", "storage-layer/processed/GTest/abc")
.option("checkpointLocation", "storage-layer/streaming_checkpoint/GTest/abc")
.trigger(Trigger.ProcessingTime("2 seconds"))
.start()
df_xyz.writeStream
.format("parquet")
.option("path", "storage-layer/processed/GTest/xyz")
.option("checkpointLocation", "storage-layer/streaming_checkpoint/GTest/xyz")
.trigger(Trigger.ProcessingTime("2 seconds"))
.start()
sparkSession.streams.active.foreach(x => x.awaitTermination())