我有一个简单的shell脚本,它连接到GCP并尝试从主题中提取Pub/Sub消息。
启动时,它会检查是否存在任何消息,如果存在,则执行简单操作,然后确认消息并循环。
看起来是这样的:
while [ 1 ]
do
gcloud pubsub subscriptions pull...
// Do something
gcloud pubsub subscriptions ack ...
done
它不会随机提取消息:它们留在队列中,不会被提取。
因此,我们试图在收到消息时添加一个while循环,比如5重试,以避免这些问题更好地工作,但并不完美。我也觉得这有点寒酸。。。
这个问题发生在另一个项目上,该项目从脚本shell迁移到Java(出于其他原因(,我们在那里使用了拉订阅,现在它在这些项目上运行得很好!
我们可能做错了什么,但我不知道。。。
我读到,有时gcloud提取的消息比pubsub队列上的实际消息要少:
https://cloud.google.com/sdk/gcloud/reference/pubsub/subscriptions/pull
但它必须至少拉一个。。。在我们的案例中,没有消息是随机提取的。
这里有什么需要改进的地方吗?
通常,依赖使用gcloud
来检索消息并对其进行处理的shell脚本并不是使用Cloud Pub/Sub的有效方法。值得注意的是,pull中缺少返回的消息是,而不是表示缺少消息;这只是意味着消息不能在pull请求的截止日期之前返回。gcloud subscriptions pull
命令将returnImmediately
属性(请参阅pull文档中的信息(设置为true
,这基本上意味着,如果内存中没有可以快速访问的消息,则不会返回任何消息。该属性已被弃用,不应设置为true
,因此我们可能需要在gcloud
中研究更改。
您最好使用客户端库编写订阅服务器,该库可以设置流并不断检索消息。如果您打算只定期运行此操作,那么您可以编写一个读取消息的作业,并在消息未收到并关闭后等待一段时间。同样,这并不能保证所有可用的消息都会被消耗掉,但在大多数情况下都是这样。
Java中的这个版本看起来是这样的:
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.atomic.AtomicLong;
import org.joda.time.DateTime;
/** A basic Pub/Sub subscriber for purposes of demonstrating use of the API. */
public class Subscriber implements MessageReceiver {
private final String PROJECT_NAME = "my-project";
private final String SUBSCRIPTION_NAME = "my-subscription";
private com.google.cloud.pubsub.v1.Subscriber subscriber;
private AtomicLong lastReceivedTimestamp = new AtomicLong(0);
private Subscriber() {
ProjectSubscriptionName subscription =
ProjectSubscriptionName.of(PROJECT_NAME, SUBSCRIPTION_NAME);
com.google.cloud.pubsub.v1.Subscriber.Builder builder =
com.google.cloud.pubsub.v1.Subscriber.newBuilder(subscription, this);
try {
this.subscriber = builder.build();
} catch (Exception e) {
System.out.println("Could not create subscriber: " + e);
System.exit(1);
}
}
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
// Process message
lastReceivedTimestamp.set(DateTime.now().getMillis());
consumer.ack();
}
private void run() {
subscriber.startAsync();
while (true) {
long now = DateTime.now().getMillis();
long currentReceived = lastReceivedTimestamp.get();
if (currentReceived > 0 && ((now - currentReceived) > 30000)) {
subscriber.stopAsync();
break;
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
System.out.println("Error while waiting for completion: " + e);
}
}
System.out.println("Subscriber has not received message in 30s. Stopping.");
subscriber.awaitTerminated();
}
public static void main(String[] args) {
Subscriber s = new Subscriber();
s.run();
System.exit(0);
}
}