GCP云功能未正确接收/确认PubSub消息



我在谷歌云平台中设置了一些数据处理工作流。这些位置处理物理地址并返回一些关于它们的度量。工作流使用云函数和PubSub流的组合。

工作流中有一个谷歌云功能,一些消息不会从触发流中提取,或者会被多次提取。我知道这是意料之中的事。然而,这种情况经常发生。足够了,这导致一些地方夸大了10倍,而其他几个地方却没有结果。

我认为callback函数没有正确确认消息,但我不确定应该有什么不同才能获得更可靠的消息拾取和确认。欢迎提出任何建议。

检索度量的GCP Cloud函数由PubSub流触发,并执行retrieve_location函数,将数据发送到不同的PubSub。retrieve_location函数如下所示:

def retrieve_location(event, context):
auth_flow()
project_id = <my project id>
subscription_name = <my subscription name>
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project_id, subscription_name)
def callback(message):
message.ack()
message_obj = message.data
message_dcde = message_obj.decode('utf-8')
message_json = json.loads(message_dcde)
get_metrics(message_json)

subscriber.subscribe(subscription_path, callback=callback)

get_metrics函数从每条消息中获取有效载荷,检索一些数据并将其发送到另一个流。这个功能似乎如预期的那样工作。

def get_metrics(loc):
<... retrieve and process data, my_data is the object that gets sent to the next stream ...>
project_id = <my project id>
topic_name = <my topic name>
topic_id = <my topic id>
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
try:
publisher.publish(topic_path, data=my_data.encode('utf-8'))
except Exception as exc:
print("topic publish failed: ", exc)

与其在云功能中设置第二个Pub/Sub订阅者,不如创建一个订阅该主题的后台功能,直接处理有效负载,例如:

def get_metrics_background_function(event, context):
message_obj = event.data
message_dcde = message_obj.decode('utf-8')
message_json = json.loads(message_dcde)
get_metrics(message_json)

您可能会将使用Cloud Pub/Sub来触发Cloud Function与直接通过Cloud Pub/Sub-客户端库使用Pub/Sub混为一谈。一般来说,你会想做一个或另一个。

如果您创建的订阅是通过云功能完成的,那么retrieve_location功能并没有真正接收和处理消息。相反,它所做的是启动一个订阅者客户端,然后很快关闭,因为subscriber.subscribe将运行到完成,因此您的函数将完成执行。

如果该函数正在启动一个客户端到触发云函数的同一订阅,那么它实际上不会做任何事情,因为基于云函数的订阅使用推送模型,而客户端库应该与拉取模型一起使用。

您要么想在retrieve_location中直接执行callback中的操作,使用事件作为消息(正如Dustin所描述的(,要么想用客户端库(例如,在GCE上(设置一个持久订阅者,该订阅者实例化并在其上调用subscribe

相关内容

  • 没有找到相关文章

最新更新