增强ASIO和线程之间的消息传递



我正在设计一个websocket服务器,该服务器接收消息并将其保存到嵌入式数据库。为了阅读信息,我正在使用boost asio。为了将消息保存到嵌入式数据库中,我看到了几个选项:

  1. 在同一线程上收到消息后,立即同步保存消息
  2. 将消息异步保存在单独的线程上

我确信第二个答案就是我想要的。但是,我不知道如何将消息从套接字线程传递到IO线程。我看到以下选项:

  1. 每个线程使用一个io服务,并使用post函数在线程之间进行通信。在这里,我不得不担心锁争用。我应该吗
  2. 使用Linux域套接字在线程之间传递消息。据我所知,没有锁争用。在这里,我可能可以使用BOOST_ASIO_DISABLE_THREADS宏来获得一些性能提升

此外,我相信拥有多个IO线程会有所帮助,这些线程将以循环方式接收消息,并保存到嵌入式数据库中。

哪种体系结构的性能最好除了我提到的,还有其他选择吗?

需要注意的几点:

  • 消息的长度正好为8字节
  • 无法使用外部数据库。数据库必须嵌入正在运行的过程
  • 我正在考虑使用RocksDB作为嵌入式数据库

我认为您不想使用unix套接字,因为它总是需要系统调用并通过内核传递数据。这通常比线程间机制更适合作为进程间机制。

除非您的数据库API要求所有调用都来自同一个线程(我对此表示怀疑),否则您不必为其使用单独的boost::asio::io_service。相反,我会在现有的io_service实例上创建一个io_service::strand,并使用strand::dispatch()成员函数(而不是io_service::post())执行任何阻塞数据库任务。以这种方式使用strand可以确保最多有一个线程访问数据库时被阻止,从而使io_service实例中的所有其他线程都可以为非数据库任务提供服务。

为什么这可能比使用单独的io_service实例更好?一个优点是,拥有一个带有一组线程的单个实例的代码和维护稍微简单一些。另一个小优点是,如果可以的话(即,如果没有任务已经在strand中运行),使用strand::dispatch()将在当前线程中执行,这可以避免上下文切换。

对于最终的优化,我同意使用入队操作无法进行系统调用的专用队列可能是最快的。但是,考虑到生产者有网络i/o,消费者有磁盘i/o,我不认为队列的实现会成为瓶颈。

在进行基准测试/分析后,我发现facebook愚蠢的MPMCQueue实现是最快的,至少有50%的优势。如果我使用非阻塞写入方法,那么套接字线程几乎没有开销,IO线程仍然很忙。系统调用的数量也比其他队列实现少得多。

升压中具有cond变量的SPSC队列较慢。我不知道为什么。这可能和愚蠢队列使用的自适应旋转有关。

此外,消息传递(在本例中为UDP域套接字)的速度慢了几个数量级,尤其是对于较大的消息。这可能与两次复制数据有关。

您可能只需要一个io_service——通过提供boost::asio::io_service::run作为线程函数,您可以创建额外的线程来处理io_service内发生的事件。对于通过网络套接字接收来自客户端的8字节消息,这应该可以很好地扩展。

为了将消息存储在数据库中,它取决于数据库&界面如果它是多线程的,那么你也可以从接收消息的线程向数据库发送每条消息。否则,我可能会设置一个boost::lockfree::queue,其中一个读取器线程提取项目并将其发送到数据库,io_service线程在新消息到达时将其附加到队列中。

这是最有效的方法吗?我不知道。这绝对很简单,如果速度不够快,你可以为你提供一个基线。但我建议一开始不要设计更复杂的东西:你根本不知道自己是否需要它,除非你对你的系统有很多了解,否则实际上不可能说复杂的方法是否比简单的方法更好。

void Consumer( lockfree::queue<uint64_t> &message_queue ) {
// Connect to database...
while (!Finished) {
message_queue.consume_all( add_to_database ); // add_to_database is a Functor that takes a message
cond_var.wait_for( ... ); // Use a timed wait to avoid missing a signal.  It's OK to consume_all() even if there's nothing in the queue.
}
}
void Producer( lockfree::queue<uint64_t> &message_queue ) {
while (!Finished) {
uint64_t m = receive_from_network( );
message_queue.push( m );
cond_var.notify_all( );
}
}

假设在您的环境中使用cxx11的约束不太严格,我会尝试使用std::async对嵌入式数据库进行异步调用。

最新更新