我正在尝试实现一个类似AWS或Azure队列的PubSub客户端,但是,我遇到了gcloud cpp-sdk的问题。
更新:删除了不必要的细节。
首先,所提供的示例无法开箱即用——我必须在session.cancel()
之前进行睡眠,否则消息不会得到确认。有没有一种可靠的方法可以等到ack((操作完成并检查其状态?至少我想确保服务器收到了我的请求。
此外,c++API似乎只提供了一个异步方法,不适合我的用例。
我需要实现以下接口,该接口通过依赖注入插入到更大的系统中。该系统在其他云上进行生产,因此我无法更改架构。只需要实现接口。
template<typename TItem>
class Queue{
public:
/*!
* Dequeues message from the queue
* Returns true on success
*/
virtual bool Dequeue( TItem & item) = 0;
/*!
* Discards(deletes) the item with from the cloud queue.
*/
virtual void Discard(const TReceipt & receipt) = 0;
};
队列的实际实现将由一个序列化器提供,该序列化器将TItem序列化为JSON并返回。
AWS和Azure SDK为每个退出队列的消息提供一个收据,以便我以后可以丢弃它。pubsub SDK的接收是绑定到会话的AckHandler对象。
一个明显错误的解决方案是保持会话打开,并在lambda中等待另一个condition_variable,直到下次调用Dequeue方法。然而,这看起来像是一个快速而肮脏的解决方案。使用Pub/Sub实现此功能的正确方法是什么?
是否有可靠的方法等待ack((操作完成并检查其状态?
不是,因为ack()
操作是最好的。即使您等待ack()
到达服务器,也不能保证消息不会被重新发送。是的,即使在成功的ack()
之后,该服务也可以重新发送该消息。
请注意,库会自动延长消息的租约,以避免在显式ack()
或nack()
之前重新发送,因此ack()
中的延迟不会影响正确性,除非在ack()
之后不久关闭应用程序。
AWS和Azure SDK为每个退出队列的消息提供一个收据,这样我以后就可以丢弃它,但我不知道如何使用pub/sub来做到这一点。
您可以在AckHandler
上调用nack()
来丢弃消息,这意味着消息将重新发送到另一个实例。这就是你所说的";丢弃";?
用Pub/Sub实现此功能的正确方法是什么?
嗯,我不确定我是否理解。我可以猜测Dequeue()
和Discard()
的语义,如果我猜错了,我很抱歉。无论如何,还不清楚这是否真的适用于任何类型?像TReceipt == int
?
需要注意的是,这里有很多猜测,你可以做这样的事情:
class PubsubBuffer {
private:
std::mutex mu_;
std::dequeue<std::pair<pubsub::Message, AckHandlerWrapper>> queue_;
public:
virtual bool Dequeue(pubsub::Message& item, AckHandlerWrapper& receipt) {
std::unique_lock<std::mutex> lk(mu_);
if (queue_.empty()) return false;
auto& f = queue_.front();
item = std::move(f.first);
receipt = std::move(.second);
queue_.pop_front();
return true;
}
virtual void Discard(AckHandlerWrapper const& receipt) {
std::move(receipt.ack_handler).ack();
}
void Push(pubsub::Message m, pubsub::AckHandler h) {
std::unique_lock<std::mutex> lk(mu_);
queue_.push_back(
std::make_pair(std::move(m), AckHandlerWrapper(std::move(h)));
}
};
std::shared_ptr<PubsubBuffer> F(pubsub::Subscriber s) {
auto buffer = std::make_shared<PubsubBuffer>();
auto handler = [buffer](pubsub::Message m, pubsbu::AckHandler h) {
buffer.Push(std::move(m), std::move(h));
};
return buffer;
}