问题:
设计一个高效、快速的命名管道客户端-服务器框架。
当前状态:
我已经有了久经沙场的生产测试框架。它速度很快,但每个管道连接使用一个线程,如果有很多客户端,线程数量可能会很快增加。我已经使用了可以根据需要扩展的智能线程池(实际上是任务池)。
我已经对管道使用了OVERLAPED模式,但后来我用WaitForSingleObject或WaitForMultipleObjects进行了阻塞,所以这就是为什么我在服务器端上每个连接需要一个线程
所需解决方案:
客户端是很好的,但在服务器端,我希望每个客户端请求只使用一个线程,而不是每个连接。因此,与其在客户端的整个生命周期中使用一个线程(连接/断开连接),我会为每个任务使用一个螺纹。所以,只有当客户端请求数据时,才需要。
我在MSDN上看到一个例子,它使用OVERLAPED结构的数组,然后使用WaitForMultipleObjects来等待它们。我觉得这个设计不好。我在这里看到两个问题。首先,您必须维护一个可能增长得相当大的阵列,并且删除操作成本高昂。其次,您有很多事件,每个数组成员都有一个事件。
我还看到了完成端口,如CreateIoCompletionPort和GetQueuedCompletionStatus,但我看不出它们有什么更好的。
我想要的是ReadFileEx和WriteFileEx所做的事情,它们调用回调例程当操作完成时。这是一种真正的异步编程风格。但问题是ConnectNamedPipe不支持这一点,而且我看到线程需要处于可警报状态,您需要调用一些*Ex函数才能实现这一点。
那么,如何最好地解决这样的问题呢
以下是MSDN的操作方法:http://msdn.microsoft.com/en-us/library/windows/desktop/aa365603(v=vs.85).aspx
我看到这种方法的问题是,如果WaitForMultipleObjects的限制是64个句柄,我看不出如何同时连接100个客户端。当然,我可以在每次请求后断开管道,但我们的想法是像在TCP服务器中一样,拥有一个永久的客户端连接,并在整个生命周期中跟踪客户端,每个客户端都有唯一的ID和特定于客户端的数据。
理想的伪代码应该是这样的:
repeat
// wait for the connection or for one client to send data
Result = ConnectNamedPipe or ReadFile or Disconnect;
case Result of
CONNECTED: CreateNewClient; // we create a new client
DATA: AssignWorkerThread; // here we process client request in a thread
DISCONNECT: CleanupAndDeleteClient // release the client object and data
end;
until Aborted;
通过这种方式,我们只有一个侦听器线程可以接受connect/disconnect/onData事件。线程池(工作线程)只处理实际的请求。通过这种方式,5个工作线程可以为许多已连接的客户端提供服务。
p.S。我当前的代码应该不重要。我用Delphi编写了这个代码,但它是纯WinAPI,所以语言无关紧要。
编辑:
目前,IOCP看起来像是解决方案:
I/O完成端口为在多处理器上处理多个异步I/O请求系统当进程创建I/O完成端口时,系统为唯一目的是为这些请求提供服务。处理许多并发的进程异步I/O请求可以通过将I/O完成端口与预先分配的线程结合使用池,而不是在收到I/O请求时创建线程。
若服务器必须处理64个以上的事件(读/写),则使用WaitForMultipleObjects的任何解决方案都不可行。这就是微软在Windows中引入IO完成端口的原因。它可以使用最合适数量的线程(通常是处理器/内核的数量)来处理大量的IO操作。
IOCP的问题是很难正确实施。隐藏的问题像地雷一样在现场传播:[1],[2](第3.6节)。我建议使用一些框架。谷歌搜索为Delphi开发人员提供了一个名为Indy的东西。也许还有其他人。
在这一点上,如果这意味着编码我自己的IOCP实现,我将忽略对命名管道的要求。这不值得悲伤。
我认为您忽略了在任何给定时间只需要几个正在侦听的命名管道实例。一旦管道实例已经连接,您就可以剥离该实例并创建一个新的侦听实例来替换它
使用MAXIMUM_WAIT_OBJECTS
(或更少)侦听命名管道实例,可以有一个专用于使用WaitForMultipleObjectsEx
进行侦听的线程。同一线程还可以使用ReadFileEx
、WriteFileEx
和APC来处理其余的I/O。工作线程将APC排队到I/O线程,以便启动I/O,I/O线程可以使用任务池返回结果(以及让工作线程了解新连接)。
I/O线程主函数看起来像这样:
create_events();
for (index = 0; index < MAXIMUM_WAIT_OBJECTS; index++) new_pipe_instance(i);
for (;;)
{
if (service_stopping && active_instances == 0) break;
result = WaitForMultipleObjectsEx(MAXIMUM_WAIT_OBJECTS, connect_events,
FALSE, INFINITE, TRUE);
if (result == WAIT_IO_COMPLETION)
{
continue;
}
else if (result >= WAIT_OBJECT_0 &&
result < WAIT_OBJECT_0 + MAXIMUM_WAIT_OBJECTS)
{
index = result - WAIT_OBJECT_0;
ResetEvent(connect_events[index]);
if (GetOverlappedResult(
connect_handles[index], &connect_overlapped[index],
&byte_count, FALSE))
{
err = ERROR_SUCCESS;
}
else
{
err = GetLastError();
}
connect_pipe_completion(index, err);
continue;
}
else
{
fail();
}
}
唯一真正复杂的是,当您调用ConnectNamedPipe
时,它可能会返回ERROR_PIPE_CONNECTED
以指示调用立即成功,或者如果调用立即失败,则返回除ERROR_IO_PENDING
以外的错误。在这种情况下,您需要重置事件,然后处理连接:
void new_pipe(ULONG_PTR dwParam)
{
DWORD index = dwParam;
connect_handles[index] = CreateNamedPipe(
pipe_name,
PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
PIPE_TYPE_MESSAGE | PIPE_WAIT | PIPE_ACCEPT_REMOTE_CLIENTS,
MAX_INSTANCES,
512,
512,
0,
NULL);
if (connect_handles[index] == INVALID_HANDLE_VALUE) fail();
ZeroMemory(&connect_overlapped[index], sizeof(OVERLAPPED));
connect_overlapped[index].hEvent = connect_events[index];
if (ConnectNamedPipe(connect_handles[index], &connect_overlapped[index]))
{
err = ERROR_SUCCESS;
}
else
{
err = GetLastError();
if (err == ERROR_SUCCESS) err = ERROR_INVALID_FUNCTION;
if (err == ERROR_PIPE_CONNECTED) err = ERROR_SUCCESS;
}
if (err != ERROR_IO_PENDING)
{
ResetEvent(connect_events[index]);
connect_pipe_completion(index, err);
}
}
connect_pipe_completion
函数将在任务池中创建一个新任务来处理新连接的管道实例,然后将一个APC排队以调用new_pipe
,从而在相同索引处创建新的侦听管道。
一旦关闭了现有的管道实例,就可以重用它们,但在这种情况下,我认为不值得这么麻烦。