我一直在尝试使用模拟器测试与Cloud PubSub的交互。它将消息发布到主题,但接收器不会被触发。以下是代码处理:
func TestPubSubEmulator(t *testing.T) {
ctx := context.Background()
topic, sub, err := CreateTestTopicAndSubscription(ctx, "project-id", "topic-id")
if err != nil {
t.Fatal(err)
}
cctx, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
var messageRecieved int32
sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
t.Log(m.Data)
atomic.AddInt32(&messageRecieved, 1)
m.Ack()
})
topic.Publish(ctx, &pubsub.Message{
Data: []byte("Hello World"),
})
time.Sleep(5 * time.Second)
t.Log(messageRecieved)
if messageRecieved != 1 {
t.Fatal("Message was never sent")
}
}
这也是创建主题和订阅的代码:
func CreateTestTopicAndSubscription(ctx context.Context, projectID, topicID string)
(*pubsub.Topic, *pubsub.Subscription, error) {
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return nil, nil, fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
topic, err := client.CreateTopic(ctx, topicID)
if err != nil {
return nil, nil, fmt.Errorf("CreateTopic: %v", err)
}
// Create a new subscription to the created topic and ensure it never expires.
sub, err := client.CreateSubscription(ctx, topicID, pubsub.SubscriptionConfig{
Topic: topic,
AckDeadline: 10 * time.Second,
ExpirationPolicy: time.Duration(0),
})
if err != nil {
return nil, nil, err
}
return topic, sub, nil
}
我目前正在尝试从另一个程序发送消息,看看它是否被触发。
很抱歉我没有提前更新这个问题。我发现问题是由指向订阅的指针引起的。它没有在听信息。我需要创建一个指向订阅的新指针,以侦听更改。
这是的概念
// Create a new subscription to the created topic and ensure it never expires.
sub, err := client.CreateSubscription(ctx, topicID, pubsub.SubscriptionConfig{
Topic: topic,
AckDeadline: 10 * time.Second,
ExpirationPolicy: time.Duration(0),
})
if err != nil {
return nil, nil, err
}
...
// This subscription won't work for some reason
sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
t.Log(m.Data)
atomic.AddInt32(&messageRecieved, 1)
m.Ack()
})
相反,应该先实现它,然后创建一个新指针来侦听它。
client.CreateSubscription(ctx, subId, pubsub.SubscriptionConfig{Topic: topic})
// This subscription would be able to receive messages
sub := client.Subscription(subId)
sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
t.Log(m.Data)
atomic.AddInt32(&messageRecieved, 1)
m.Ack()
})