如何将数据从 Google PubSub 主题流式传输到 PySpark(在 Google Cloud 上)



我有数据流到Google PubSub中的一个主题中。 我可以使用简单的Python代码看到该数据:

...
def callback(message):
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") + ": message = '" + message.data + "'")
message.ack()
future = subscriber.subscribe(subscription_name, callback)
future.result()

上面的python代码从Google PubSub主题(订阅者subscriber_name(接收数据,并将其写入终端,如预期的那样。 我想将主题中的相同数据流式传输到 PySpark(RDD 或数据帧(,以便我可以在 PySpark 中进行其他流式转换,例如窗口化和聚合,如下所述:https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html。

该链接包含用于读取其他流媒体源(例如Kafka(的文档,但不包括Google PubSub。 有没有办法从Google PubSub流式传输到PySpark?

您可以使用Apache Bahir,它为Apache Spark提供扩展,包括Google Cloud Pub/Sub的连接器。

您可以从 Google CloudPlatform 找到一个示例,该示例使用 Spark on Kubernetes 从从 Google Cloud PubSub 主题接收的数据流中计算字数,并将结果写入 Google Cloud Storage (GCS( 存储桶。

还有另一个例子,它使用DStream在 Cloud Dataproc 上部署 Apache Spark 流应用程序,并处理来自 Cloud Pub/Sub 的消息。

你可以使用Apache Beam: https://beam.apache.org/

Apache Beam 对 Cloud Pub/Sub: https://beam.apache.org/documentation/io/built-in/有 Pyhton 支持

有一个Python SDK:https://beam.apache.org/documentation/sdks/python/

以及对Spark的支持:https://beam.apache.org/documentation/runners/capability-matrix/

我相信你可以使用这个:https://cloud.google.com/pubsub/lite/docs/samples/pubsublite-spark-streaming-from-pubsublite

您创建一个订阅并放入火花流中的选项。

spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate()
sdf = (
spark.readStream.format("pubsublite")
.option(
"pubsublite.subscription",
f"projects/{project_number}/locations/{location}/subscriptions/{subscription_id}",
)
.load()
)

最新更新