Pub子消息仍在队列中,但未提取



我有一个简单的shell脚本,它连接到GCP并尝试从主题中提取Pub/Sub消息。

启动时,它会检查是否存在任何消息,如果存在,则执行简单操作,然后确认消息并循环。

看起来是这样的:

while [ 1 ]
do
gcloud pubsub subscriptions pull...
// Do something
gcloud pubsub subscriptions ack ...
done

它不会随机提取消息:它们留在队列中,不会被提取。

因此,我们试图在收到消息时添加一个while循环,比如5重试,以避免这些问题更好地工作,但并不完美。我也觉得这有点寒酸。。。

这个问题发生在另一个项目上,该项目从脚本shell迁移到Java(出于其他原因(,我们在那里使用了拉订阅,现在它在这些项目上运行得很好!

我们可能做错了什么,但我不知道。。。

我读到,有时gcloud提取的消息比pubsub队列上的实际消息要少:

https://cloud.google.com/sdk/gcloud/reference/pubsub/subscriptions/pull

但它必须至少拉一个。。。在我们的案例中,没有消息是随机提取的。

这里有什么需要改进的地方吗?

通常,依赖使用gcloud来检索消息并对其进行处理的shell脚本并不是使用Cloud Pub/Sub的有效方法。值得注意的是,pull中缺少返回的消息是,而不是表示缺少消息;这只是意味着消息不能在pull请求的截止日期之前返回。gcloud subscriptions pull命令将returnImmediately属性(请参阅pull文档中的信息(设置为true,这基本上意味着,如果内存中没有可以快速访问的消息,则不会返回任何消息。该属性已被弃用,不应设置为true,因此我们可能需要在gcloud中研究更改。

您最好使用客户端库编写订阅服务器,该库可以设置流并不断检索消息。如果您打算只定期运行此操作,那么您可以编写一个读取消息的作业,并在消息未收到并关闭后等待一段时间。同样,这并不能保证所有可用的消息都会被消耗掉,但在大多数情况下都是这样。

Java中的这个版本看起来是这样的:

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.atomic.AtomicLong;
import org.joda.time.DateTime;
/** A basic Pub/Sub subscriber for purposes of demonstrating use of the API. */
public class Subscriber implements MessageReceiver {
private final String PROJECT_NAME = "my-project";
private final String SUBSCRIPTION_NAME = "my-subscription";
private com.google.cloud.pubsub.v1.Subscriber subscriber;
private AtomicLong lastReceivedTimestamp = new AtomicLong(0);
private Subscriber() {
ProjectSubscriptionName subscription =
ProjectSubscriptionName.of(PROJECT_NAME, SUBSCRIPTION_NAME);
com.google.cloud.pubsub.v1.Subscriber.Builder builder =
com.google.cloud.pubsub.v1.Subscriber.newBuilder(subscription, this);
try {
this.subscriber = builder.build();
} catch (Exception e) {
System.out.println("Could not create subscriber: " + e);
System.exit(1);
}
}
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
// Process message
lastReceivedTimestamp.set(DateTime.now().getMillis());
consumer.ack();
}
private void run() {
subscriber.startAsync();
while (true) {
long now = DateTime.now().getMillis();
long currentReceived = lastReceivedTimestamp.get();
if (currentReceived > 0 && ((now - currentReceived) > 30000)) {
subscriber.stopAsync();
break;
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
System.out.println("Error while waiting for completion: " + e);
}
}
System.out.println("Subscriber has not received message in 30s. Stopping.");
subscriber.awaitTerminated();
}
public static void main(String[] args) {
Subscriber s = new Subscriber();
s.run();
System.exit(0);
}
}

最新更新