我使用Spark 2.2.0。
如何使用 pyspark 将 Amazon SQS 流馈送到 Spark 结构化流?
这个问题试图通过创建自定义接收器来回答非结构化流和 scala 的问题。
在 pyspark 中可能出现类似的事情吗?
spark.readStream
.format("s3-sqs")
.option("fileFormat", "json")
.option("queueUrl", ...)
.schema(...)
.load()
根据上面的数据砖,接收器可用于S3-SQS文件源。但是,仅对于 SQS 如何一种方法。
我尝试从 AWS-SQS-Receive_Message 理解以接收消息。但是,如何直接将流发送到火花流尚不清楚。
我对 Amazon SQS一无所知,但是"如何使用 pyspark 将 Amazon SQS 流提供给 Spark 结构化流">对于任何外部消息传递系统或使用 Spark Structured Stream(又名 Spark"Streams")的数据源都是不可能的。
在Spark Structured Streaming中,当Spark定期拉入数据时,情况正好相反(类似于Kafka Consumer API的工作方式,即它不给数据拉入数据)。
换句话说,Spark "Streams" 只是来自 Amazon SQS 中"队列"的消息的另一种使用者。
每当我被要求将外部系统与Spark"Streams"集成时,我都会开始使用客户端/消费者API为系统编写客户端。
获得它后,下一步是使用上面的示例客户端代码为外部系统(例如 Amazon SQS)开发自定义流源。
开发自定义流式处理Source
时,必须执行以下步骤:
编写一个实现
Source
特征的 Scala 类使用具有完全限定类名的文件向 Spark SQL 注册 Scala 类(自定义
Source
META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
或在format
中使用完全限定类名
拥有自定义流式处理源是一个由两部分组成的开发,开发源(并选择性地将其注册到 Spark SQL)并通过format
方法在 Spark 结构化流应用程序(Python 中)中使用它。