如何从 Amazon SQS 加载流数据?



我使用Spark 2.2.0。

如何使用 pyspark 将 Amazon SQS 流馈送到 Spark 结构化流?

这个问题试图通过创建自定义接收器来回答非结构化流和 scala 的问题。
在 pyspark 中可能出现类似的事情吗?

spark.readStream 
.format("s3-sqs") 
.option("fileFormat", "json") 
.option("queueUrl", ...) 
.schema(...) 
.load()

根据上面的数据砖,接收器可用于S3-SQS文件源。但是,仅对于 SQS 如何一种方法。

我尝试从 AWS-SQS-Receive_Message 理解以接收消息。但是,如何直接将流发送到火花流尚不清楚。

我对 Amazon SQS一无所知,但是"如何使用 pyspark 将 Amazon SQS 流提供给 Spark 结构化流">对于任何外部消息传递系统或使用 Spark Structured Stream(又名 Spark"Streams")的数据源都是不可能的。

在Spark Structured Streaming中,当Spark定期拉入数据时,情况正好相反(类似于Kafka Consumer API的工作方式,即它不给数据拉入数据)。

换句话说,Spark "Streams" 只是来自 Amazon SQS 中"队列"的消息的另一种使用者。

每当我被要求将外部系统与Spark"Streams"集成时,我都会开始使用客户端/消费者API为系统编写客户端。

获得它后,下一步是使用上面的示例客户端代码为外部系统(例如 Amazon SQS)开发自定义流源。

开发自定义流式处理Source时,必须执行以下步骤:

  1. 编写一个实现Source特征的 Scala 类

  2. 使用具有完全限定类名的文件向 Spark SQL 注册 Scala 类(自定义SourceMETA-INF/services/org.apache.spark.sql.sources.DataSourceRegister或在format中使用完全限定类名

拥有自定义流式处理源是一个由两部分组成的开发,开发源(并选择性地将其注册到 Spark SQL)并通过format方法在 Spark 结构化流应用程序(Python 中)中使用它。

相关内容

  • 没有找到相关文章