我试图在我的python应用程序上使用pub子服务。当我运行代码时,由于某种原因,它会卡在最后一个发布者行上,代码永远不会结束。订阅者看起来很好。有人知道我的代码有什么问题吗?
出版者:
import os
from google.cloud import pubsub_v1
credentials_path = 'PATH/TO/THE/KEY.JSON'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials_path
publisher = pubsub_v1.PublisherClient()
topic_path = 'projects/PROJECT_NAME/topics/TOPIC_NAME'
# simple garbage text to check if it's working
data = 'A garden sensor is ready!'
data = data.encode('utf-8')
attributes = {
'sensorName': 'garden-001',
'temperature': '75.0',
'humidity': '60'
}
future = publisher.publish(topic_path, data, **attributes)
print(f'published message id {future.result()}') # here it is just waiting forever
用户:
import os
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError
credentials_path = 'PATH/TO/THE/KEY.JSON'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials_path
subscriber = pubsub_v1.SubscriberClient()
subscription_path = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_NAME'
def callback(message):
print(f'Received message: {message}')
print(f'data: {message.data}')
if message.attributes:
print("Attributes:")
for key in message.attributes:
value = message.attributes.get(key)
print(f"{key}: {value}")
message.ack()
streaming_pull_future = subscriber.subscribe(
subscription_path, callback=callback)
print(f'Listening for messages on {subscription_path}')
# wrap subscriber in a 'with' block to automatically call close() when done
with subscriber:
try:
streaming_pull_future.result()
except TimeoutError:
streaming_pull_future.cancel()
# block until the shutdown is complete
streaming_pull_future.result()
Google提供了不错的文档来使用它的服务,包括Pub/Sub,其中包括一个基本的Python示例,可以帮助你避免你的问题。
旁白:您的发布者和订阅者代码段在代码中静态地设置了GOOGLE_APPLICATION_CREDENTIALS
。别这样!在运行代码之前设置环境变量。这样,您就可以在不更改代码的情况下修改该值,但更重要的是,该值可以由运行时设置,例如Compute Engine.
下面是一个基于从环境中获得的应用程序默认凭证的代码的工作示例:
Q="74535931"
BILLING="[YOUR-BILLING-ID]"
PROJECT="$(whoami)-$(date %y%m%d)-${Q}"
gcloud projects create ${PROJECT}
gcloud beta billing projects link ${PROJECT}
--billing-account=${BILLING}
gcloud services enable pubsub.googleapis.com
--project=${PROJECT}
ACCOUNT=tester
EMAIL=${ACCOUNT}@${PROJECT}.iam.gserviceaccount.com
gcloud iam service-accounts create ${ACCOUNT}
--project=${PROJECT}
gcloud iam service-accounts keys create ${PWD}/${ACCOUNT}.json
--iam-account=${EMAIL}
gcloud projects add-iam-policy-binding ${PROJECT}
--member=serviceAccount:${EMAIL}
--role=roles/pubsub.editor
export GOOGLE_APPLICATION_CREDENTIALS=${PWD}/${ACCOUNT}.json
export PROJECT
export PUB="pub"
export SUB="sub"
gcloud pubsub topics create ${PUB}
--project=${PROJECT}
gcloud pubsub subscriptions create ${SUB}
--topic=${PUB}
--project=${PROJECT}
publish.py
:
import os
from google.cloud import pubsub_v1
project = os.getenv("PROJECT")
topic = os.getenv("PUB")
topic_path = f"projects/{project}/topics/{topic}"
data = 'A garden sensor is ready!'
data = data.encode('utf-8')
attributes = {
'sensorName': 'garden-001',
'temperature': '75.0',
'humidity': '60'
}
publisher = pubsub_v1.PublisherClient()
future = publisher.publish(topic_path, data, **attributes)
print(f'published message id {future.result()}')
subscribe.py
:
import os
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError
project=os.getenv("PROJECT")
subscription=os.getenv("SUB")
subscription_path = f"projects/{project}/subscriptions/{subscription}"
def callback(message):
print(f'Received message: {message}')
print(f'data: {message.data}')
if message.attributes:
print("Attributes:")
for key in message.attributes:
value = message.attributes.get(key)
print(f"{key}: {value}")
message.ack()
subscriber = pubsub_v1.SubscriberClient()
streaming_pull_future = subscriber.subscribe(
subscription_path, callback=callback)
print(f'Listening for messages on {subscription_path}')
with subscriber:
try:
streaming_pull_future.result()
except TimeoutError:
streaming_pull_future.cancel()
# block until the shutdown is complete
streaming_pull_future.result()
运行python3 subscribe.py
:
python3 subscribe.py
Listening for messages on projects/{project}/subscriptions/{sub}
Received message: Message {
data: b'A garden sensor is ready!'
ordering_key: ''
attributes: {
"humidity": "60",
"sensorName": "garden-001",
"temperature": "75.0"
}
}
data: b'A garden sensor is ready!'
Attributes:
humidity: 60
temperature: 75.0
sensorName: garden-001
在一个单独的窗口python3 publish.py
:
python3 publish.py
published message id 1234567890123456