TCP服务器是使用SocketAsyncEventArgs和它的异步方法作为Windows服务开发的。Main的开头有两行代码:
ThreadPool.SetMaxThreads(15000, 30000);
ThreadPool.SetMinThreads(10000, 20000);
都返回true(返回值被记录)。现在有2000到3000个客户端开始向这个服务器发送消息,并且它开始接受连接(我计算了连接的数量,结果和预期的一样——有一个连接池)。服务器进程的线程数将从~2050增长到~3050。到目前为止一切顺利!
现在有一个Received方法,它将在ReceiveAsync返回true或SocketAsyncEventArgs的Completed事件后被调用。
这里问题开始了:不管有多少客户端连接,发送多少消息,一秒钟内最多调用20次Received !随着客户端数量的增加,这个数字(20)下降到~10。
环境:TCP服务器和客户端在同一台机器上被模拟。我在两台机器上测试了代码,一台有2核CPU和4GB RAM,另一台有8核CPU和12GB RAM。没有数据丢失(尚未),有时我在每个接收操作中接收多个消息。这很好。但是如何才能增加接收操作的数量呢?
关于实现的附加说明:代码很大,包含了许多不同的逻辑。总的描述是:我有一个SocketAsyncEventArgs来接受新的连接。效果很好。现在,对于每个新接受的连接,我创建一个新的SocketAsyncEventArgs来接收数据。我把这个(SocketAsyncEventArgs创建接收)在一个池。它不会被重用,但它的UserToken被用于跟踪连接;例如,那些连接被断开或那些连接没有发送任何数据7分钟将被关闭和处置(SocketAsyncEventArgs的AcceptSocket将被关闭(两者),关闭和处置,SocketAsyncEventArgs对象本身也是如此)。下面是一个执行这些任务的Sudo类,但所有其他逻辑和日志记录以及错误检查和其他任何东西都被删除,以使其简单明了(也许这样更容易发现有问题的代码):
class Sudo
{
Socket _listener;
int _port = 8797;
public Sudo()
{
var ipEndPoint = new IPEndPoint(IPAddress.Any, _port);
_listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_listener.Bind(ipEndPoint);
_listener.Listen(100);
Accept(null);
}
void Accept(SocketAsyncEventArgs acceptEventArg)
{
if (acceptEventArg == null)
{
acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed += AcceptCompleted;
}
else acceptEventArg.AcceptSocket = null;
bool willRaiseEvent = _listener.AcceptAsync(acceptEventArg); ;
if (!willRaiseEvent) Accepted(acceptEventArg);
}
void AcceptCompleted(object sender, SocketAsyncEventArgs e)
{
Accepted(e);
}
void Accepted(SocketAsyncEventArgs e)
{
var acceptSocket = e.AcceptSocket;
var readEventArgs = CreateArg(acceptSocket);
var willRaiseEvent = acceptSocket.ReceiveAsync(readEventArgs);
Accept(e);
if (!willRaiseEvent) Received(readEventArgs);
}
SocketAsyncEventArgs CreateArg(Socket acceptSocket)
{
var arg = new SocketAsyncEventArgs();
arg.Completed += IOCompleted;
var buffer = new byte[64 * 1024];
arg.SetBuffer(buffer, 0, buffer.Length);
arg.AcceptSocket = acceptSocket;
arg.SocketFlags = SocketFlags.None;
return arg;
}
void IOCompleted(object sender, SocketAsyncEventArgs e)
{
switch (e.LastOperation)
{
case SocketAsyncOperation.Receive:
Received(e);
break;
default: break;
}
}
void Received(SocketAsyncEventArgs e)
{
if (e.SocketError != SocketError.Success || e.BytesTransferred == 0 || e.Buffer == null || e.Buffer.Length == 0)
{
// Kill(e);
return;
}
var bytesList = new List<byte>();
for (var i = 0; i < e.BytesTransferred; i++) bytesList.Add(e.Buffer[i]);
var bytes = bytesList.ToArray();
Process(bytes);
ReceiveRest(e);
Perf.IncOp();
}
void ReceiveRest(SocketAsyncEventArgs e)
{
e.SocketFlags = SocketFlags.None;
for (int i = 0; i < e.Buffer.Length; i++) e.Buffer[i] = 0;
e.SetBuffer(0, e.Buffer.Length);
var willRaiseEvent = e.AcceptSocket.ReceiveAsync(e);
if (!willRaiseEvent) Received(e);
}
void Process(byte[] bytes) { }
}
速度变慢的原因是这些线程中的每一个都需要上下文切换,这是一个相对昂贵的操作。添加的线程越多,花在上下文切换上的CPU比例就越大,而不是花在实际代码上。
您以一种相当奇怪的方式遇到了这个每个客户端一个线程的瓶颈。服务器端异步的全部意义在于减少线程的数量——不是每个客户端有一个线程,理想情况下,系统中的每个逻辑处理器只有一两个线程。
你发布的异步代码看起来很好,所以我只能猜测你的Process
方法有一些容易忽视的非异步,阻塞I/O在它,即数据库或文件访问。当I/O阻塞时,. net线程池检测到这一点并自动启动一个新线程——它基本上在这里失去控制,Process
中的I/O作为瓶颈。
一个异步管道真的需要100%异步才能从中获得任何显著的好处。一半会让你编写复杂的代码,执行起来和简单的同步代码一样差。
如果你绝对不能使Process
方法纯异步,你可能有一些运气假装它。让事情在队列中等待一个小的、有限大小的线程池处理。