与云pubsub模拟器的集成测试.从同一代码块发送和接收



我一直在尝试使用模拟器测试与Cloud PubSub的交互。它将消息发布到主题,但接收器不会被触发。以下是代码处理:

func TestPubSubEmulator(t *testing.T) {
ctx := context.Background()
topic, sub, err := CreateTestTopicAndSubscription(ctx, "project-id", "topic-id")
if err != nil {
t.Fatal(err)
}
cctx, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
var messageRecieved int32
sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
t.Log(m.Data)
atomic.AddInt32(&messageRecieved, 1)
m.Ack()
})
topic.Publish(ctx, &pubsub.Message{
Data: []byte("Hello World"),
})
time.Sleep(5 * time.Second)
t.Log(messageRecieved)
if messageRecieved != 1 {
t.Fatal("Message was never sent")
}
}

这也是创建主题和订阅的代码:

func CreateTestTopicAndSubscription(ctx context.Context, projectID, topicID string) 
(*pubsub.Topic, *pubsub.Subscription, error) {
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return nil, nil, fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
topic, err := client.CreateTopic(ctx, topicID)
if err != nil {
return nil, nil, fmt.Errorf("CreateTopic: %v", err)
}
// Create a new subscription to the created topic and ensure it never expires.
sub, err := client.CreateSubscription(ctx, topicID, pubsub.SubscriptionConfig{
Topic:            topic,
AckDeadline:      10 * time.Second,
ExpirationPolicy: time.Duration(0),
})
if err != nil {
return nil, nil, err
}
return topic, sub, nil
}

我目前正在尝试从另一个程序发送消息,看看它是否被触发。

很抱歉我没有提前更新这个问题。我发现问题是由指向订阅的指针引起的。它没有在听信息。我需要创建一个指向订阅的新指针,以侦听更改。

这是的概念

// Create a new subscription to the created topic and ensure it never expires.
sub, err := client.CreateSubscription(ctx, topicID, pubsub.SubscriptionConfig{
Topic:            topic,
AckDeadline:      10 * time.Second,
ExpirationPolicy: time.Duration(0),
})
if err != nil {
return nil, nil, err
}
...
// This subscription won't work for some reason
sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
t.Log(m.Data)
atomic.AddInt32(&messageRecieved, 1)
m.Ack()
})

相反,应该先实现它,然后创建一个新指针来侦听它。

client.CreateSubscription(ctx, subId, pubsub.SubscriptionConfig{Topic: topic})
// This subscription would be able to receive messages
sub := client.Subscription(subId)
sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
t.Log(m.Data)
atomic.AddInt32(&messageRecieved, 1)
m.Ack()
})

最新更新