我是否可以使用 Boost.Asio 同步从套接字读取,并在多线程 I/O 服务上超时



我有一个使用Boost.Asio进行TCP和UDP套接字通信的应用程序。我知道"Asio"中的"A"代表异步,因此该库倾向于鼓励您尽可能使用异步I/O。我有几种情况,同步套接字读取更可取。但是,与此同时,我想在所述接收呼叫上设置超时,因此读取不可能无限期阻塞。

这似乎是Boost.Asio用户中一个非常普遍的问题,以下过去的Stack Overflow问题涉及该主题:

  • C++ 提升 ASIO:如何在超时的情况下读/写?
  • asio::读取超时
  • 加速 ASIO 超时
  • 如何在升压 asio 中设置阻塞套接字的超时?

甚至可能还有更多。文档中甚至还有有关如何实现超时同步操作的示例。它们归结为将同步操作转换为异步操作,然后与asio::deadline_timer并行启动它。然后,计时器的过期处理程序可以在超时过期时取消异步读取。这看起来像这样(片段取自上面的链接示例(:

std::size_t receive(const boost::asio::mutable_buffer& buffer,
      boost::posix_time::time_duration timeout, boost::system::error_code& ec)
  {
    // Set a deadline for the asynchronous operation.
    deadline_.expires_from_now(timeout);
    // Set up the variables that receive the result of the asynchronous
    // operation. The error code is set to would_block to signal that the
    // operation is incomplete. Asio guarantees that its asynchronous
    // operations will never fail with would_block, so any other value in
    // ec indicates completion.
    ec = boost::asio::error::would_block;
    std::size_t length = 0;
    // Start the asynchronous operation itself. The handle_receive function
    // used as a callback will update the ec and length variables.
    socket_.async_receive(boost::asio::buffer(buffer),
        boost::bind(&client::handle_receive, _1, _2, &ec, &length));
    // Block until the asynchronous operation has completed.
    do io_service_.run_one(); while (ec == boost::asio::error::would_block);
    return length;
  }

这实际上是一个相对干净的解决方案:启动异步操作,然后手动轮询asio::io_service以一次执行一个异步处理程序,直到async_receive()完成或计时器过期。

但是,套接字的基础 I/O 服务已在一个或多个后台线程中运行的情况如何?在这种情况下,无法保证异步操作的处理程序将由上述代码片段中的前台线程运行,因此run_one()直到稍后执行可能不相关的处理程序才会返回。这将使套接字读取相当无响应。

asio::io_service有一个poll_one()函数,可以在不阻塞的情况下检查服务的队列,但我看不到在处理程序执行之前阻止前台线程(模拟同步调用行为(的好方法,除了没有后台线程正在执行的情况asio::io_service::run()已经。

看到了两个潜在的解决方案,我都不喜欢:

  1. 使用条件变量或类似的构造在启动异步操作后使前台线程块。在async_receive()调用的处理程序中,向条件变量发出信号以取消阻止线程。这会为每次读取引入一些锁定,我想避免这种情况,因为我想在 UDP 套接字读取上实现最大可能的吞吐量。否则,它是可行的,并且可能是我会做的,除非出现一种更好的方法。

  2. 确保套接字有自己的asio::io_service,该未由任何后台线程运行。这使得在需要的情况下更难将异步 I/O 与套接字一起使用。

对以安全方式完成此操作的其他方法有什么想法吗?


旁白:对于以前的 SO 问题,有一些建议使用 SO_RCVTIMEO 套接字选项来实现套接字读取超时的答案。这在理论上听起来很棒,但它似乎至少在我的平台上不起作用(Ubuntu 12.04,Boost v1.55(。我可以设置套接字超时,但它不会在 Asio 上产生所需的效果。相关代码/boost/asio/detail/impl/socket_ops.ipp

size_t sync_recvfrom(socket_type s, state_type state, buf* bufs,
    size_t count, int flags, socket_addr_type* addr,
    std::size_t* addrlen, boost::system::error_code& ec)
{
  if (s == invalid_socket)
  {
    ec = boost::asio::error::bad_descriptor;
    return 0;
  }
  // Read some data.
  for (;;)
  {
    // Try to complete the operation without blocking.
    signed_size_type bytes = socket_ops::recvfrom(
        s, bufs, count, flags, addr, addrlen, ec);
    // Check if operation succeeded.
    if (bytes >= 0)
      return bytes;
    // Operation failed.
    if ((state & user_set_non_blocking)
        || (ec != boost::asio::error::would_block
          && ec != boost::asio::error::try_again))
      return 0;
    // Wait for socket to become ready.
    if (socket_ops::poll_read(s, 0, ec) < 0)
      return 0;
  }
}

如果套接字读取超时,对上述recvfrom()的调用将返回EAGAINEWOULDBLOCK,这些被转换为boost::asio::error::try_againboost::asio::error::would_block。在这种情况下,上面的代码将调用 poll_read() 函数,对于我的平台来说,它看起来像:

int poll_read(socket_type s, state_type state, boost::system::error_code& ec)
{
  if (s == invalid_socket)
  {
    ec = boost::asio::error::bad_descriptor;
    return socket_error_retval;
  }
  pollfd fds;
  fds.fd = s;
  fds.events = POLLIN;
  fds.revents = 0;
  int timeout = (state & user_set_non_blocking) ? 0 : -1;
  clear_last_error();
  int result = error_wrapper(::poll(&fds, 1, timeout), ec);
  if (result == 0)
    ec = (state & user_set_non_blocking)
      ? boost::asio::error::would_block : boost::system::error_code();
  else if (result > 0)
    ec = boost::system::error_code();
  return result;
}

我剪掉了为其他平台有条件编译的代码。如您所见,如果套接字不是非阻塞套接字,它最终会无限超时调用poll(),因此阻塞直到套接字有数据要读取(并阻止超时尝试(。因此,SO_RCVTIMEO选项无效。

Boost.Asio对futures的支持可能提供了一个优雅的解决方案。 当异步操作提供 boost::asio::use_future 值作为其完成处理程序时,启动函数将返回一个将接收操作结果的std::future对象。 此外,如果操作以失败完成,则error_code将转换为system_error并通过 future 传递给调用方。

在 Boost.Asio C++11 Futures datytime 客户端示例中,一个专用线程运行 io_service ,主线程启动异步操作,然后同步等待操作完成,如下所示:

std::array<char, 128> recv_buf;
udp::endpoint sender_endpoint;
std::future<std::size_t> recv_length =
  socket.async_receive_from(
      boost::asio::buffer(recv_buf),
      sender_endpoint,
      boost::asio::use_future);
// Do other things here while the receive completes.
std::cout.write(
    recv_buf.data(),
    recv_length.get()); // Blocks until receive is complete.

使用 future s 时,实现超时同步读取的总体方法与以前相同。 不使用同步读取,而是使用异步读取并异步等待计时器。 唯一的小变化是,不是在io_service上阻塞或定期检查谓词,而是调用future::get()来阻止,直到操作完成成功或失败(例如超时(。

如果 C++11 不可用,则可以针对 Boost.Thread 的future自定义异步操作的返回类型,如本答案所示。

相关内容

  • 没有找到相关文章

最新更新