每秒接收20个SocketAsyncEventArgs



TCP服务器是使用SocketAsyncEventArgs和它的异步方法作为Windows服务开发的。Main的开头有两行代码:

ThreadPool.SetMaxThreads(15000, 30000);
ThreadPool.SetMinThreads(10000, 20000);

都返回true(返回值被记录)。现在有2000到3000个客户端开始向这个服务器发送消息,并且它开始接受连接(我计算了连接的数量,结果和预期的一样——有一个连接池)。服务器进程的线程数将从~2050增长到~3050。到目前为止一切顺利!

现在有一个Received方法,它将在ReceiveAsync返回true或SocketAsyncEventArgs的Completed事件后被调用。

这里问题开始了:不管有多少客户端连接,发送多少消息,一秒钟内最多调用20次Received !随着客户端数量的增加,这个数字(20)下降到~10。

环境:TCP服务器和客户端在同一台机器上被模拟。我在两台机器上测试了代码,一台有2核CPU和4GB RAM,另一台有8核CPU和12GB RAM。没有数据丢失(尚未),有时我在每个接收操作中接收多个消息。这很好。但是如何才能增加接收操作的数量呢?

关于实现的附加说明:代码很大,包含了许多不同的逻辑。总的描述是:我有一个SocketAsyncEventArgs来接受新的连接。效果很好。现在,对于每个新接受的连接,我创建一个新的SocketAsyncEventArgs来接收数据。我把这个(SocketAsyncEventArgs创建接收)在一个池。它不会被重用,但它的UserToken被用于跟踪连接;例如,那些连接被断开或那些连接没有发送任何数据7分钟将被关闭和处置(SocketAsyncEventArgs的AcceptSocket将被关闭(两者),关闭和处置,SocketAsyncEventArgs对象本身也是如此)。下面是一个执行这些任务的Sudo类,但所有其他逻辑和日志记录以及错误检查和其他任何东西都被删除,以使其简单明了(也许这样更容易发现有问题的代码):

class Sudo
{
    Socket _listener;
    int _port = 8797;
    public Sudo()
    {
        var ipEndPoint = new IPEndPoint(IPAddress.Any, _port);
        _listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        _listener.Bind(ipEndPoint);
        _listener.Listen(100);
        Accept(null);
    }
    void Accept(SocketAsyncEventArgs acceptEventArg)
    {
        if (acceptEventArg == null)
        {
            acceptEventArg = new SocketAsyncEventArgs();
            acceptEventArg.Completed += AcceptCompleted;
        }
        else acceptEventArg.AcceptSocket = null;
        bool willRaiseEvent = _listener.AcceptAsync(acceptEventArg); ;
        if (!willRaiseEvent) Accepted(acceptEventArg);
    }
    void AcceptCompleted(object sender, SocketAsyncEventArgs e)
    {
        Accepted(e);
    }
    void Accepted(SocketAsyncEventArgs e)
    {
        var acceptSocket = e.AcceptSocket;
        var readEventArgs = CreateArg(acceptSocket);
        var willRaiseEvent = acceptSocket.ReceiveAsync(readEventArgs);
        Accept(e);
        if (!willRaiseEvent) Received(readEventArgs);
    }
    SocketAsyncEventArgs CreateArg(Socket acceptSocket)
    {
        var arg = new SocketAsyncEventArgs();
        arg.Completed += IOCompleted;
        var buffer = new byte[64 * 1024];
        arg.SetBuffer(buffer, 0, buffer.Length);
        arg.AcceptSocket = acceptSocket;
        arg.SocketFlags = SocketFlags.None;
        return arg;
    }
    void IOCompleted(object sender, SocketAsyncEventArgs e)
    {
        switch (e.LastOperation)
        {
            case SocketAsyncOperation.Receive:
                Received(e);
                break;
            default: break;
        }
    }
    void Received(SocketAsyncEventArgs e)
    {
        if (e.SocketError != SocketError.Success || e.BytesTransferred == 0 || e.Buffer == null || e.Buffer.Length == 0)
        {
            // Kill(e);
            return;
        }
        var bytesList = new List<byte>();
        for (var i = 0; i < e.BytesTransferred; i++) bytesList.Add(e.Buffer[i]);
        var bytes = bytesList.ToArray();
        Process(bytes);
        ReceiveRest(e);
        Perf.IncOp();
    }
    void ReceiveRest(SocketAsyncEventArgs e)
    {
        e.SocketFlags = SocketFlags.None;
        for (int i = 0; i < e.Buffer.Length; i++) e.Buffer[i] = 0;
        e.SetBuffer(0, e.Buffer.Length);
        var willRaiseEvent = e.AcceptSocket.ReceiveAsync(e);
        if (!willRaiseEvent) Received(e);
    }
    void Process(byte[] bytes) { }
}

速度变慢的原因是这些线程中的每一个都需要上下文切换,这是一个相对昂贵的操作。添加的线程越多,花在上下文切换上的CPU比例就越大,而不是花在实际代码上。

您以一种相当奇怪的方式遇到了这个每个客户端一个线程的瓶颈。服务器端异步的全部意义在于减少线程的数量——不是每个客户端有一个线程,理想情况下,系统中的每个逻辑处理器只有一两个线程。

你发布的异步代码看起来很好,所以我只能猜测你的Process方法有一些容易忽视的非异步,阻塞I/O在它,即数据库或文件访问。当I/O阻塞时,. net线程池检测到这一点并自动启动一个新线程——它基本上在这里失去控制,Process中的I/O作为瓶颈。

一个异步管道真的需要100%异步才能从中获得任何显著的好处。一半会让你编写复杂的代码,执行起来和简单的同步代码一样差。

如果你绝对不能使Process方法纯异步,你可能有一些运气假装它。让事情在队列中等待一个小的、有限大小的线程池处理。

相关内容

  • 没有找到相关文章

最新更新