UDP服务器的C#可等待SocketAsyncEventArgs包装器;插座ReceiveAsync返回无效的Remot



正如标题所说,我正在原始套接字上编写UDP服务器,并使用SocketAsyncEventArgs,因为我想快速编写一些东西。

我知道UdpClient是存在的,并且有更容易的方法来编写服务器,但我想学习如何正确使用SocketAsyncEventArgs和套接字。ReceiveFromAsync/socket。SendToAsync方法声称的"增强吞吐量"one_answers"更好的可扩展性"(用于SocketAsyncEventArgs的MSDN文档(。

我基本上遵循了MSDN的例子,因为我认为这是学习这些方法如何工作的一个不错的起点,但遇到了一些问题。服务器最初工作,可以回显接收到的字节,但将随机"失败"从正确的地址接收字节。RemoteEndPoint将由UDP占位符EndPoint{0.0.0.0:0}填充,而不是正确的localhost客户端地址(例如127.0.0.1:7007((因为缺少更好的术语(。显示问题的图像(控制台是波兰语的,对不起。请相信我,SocketException消息是"Desired Address在此上下文中无效"(。

我有时会从MSDN示例中批量剥离数据块,只更改套接字的SocketAsyncEventArgs实例上填充的字段。ReceiveFromAsync调用(根据MSDN文档->socket.ReceiveFromAsync文档(,最终结果仍然相同。此外,这是一个间歇性的问题,而不是一个持续的问题。从我所注意到的情况来看,服务器不会一直出错。

到目前为止,我的想法是UdpServer的状态问题,UdpClient方面的一些不一致,或者对TaskCompletionSource的滥用。

编辑1:

我觉得我应该解决我为什么使用SocketAsyncEventArgs的问题。我完全理解发送和接收数据有更简单的方法。async/await套接字扩展是实现这一点的好方法,也是我最初的做法。我想将async/await与旧的api SocketArgs进行比较,看看这两种方法有多不同。


UdpClient、UdpServer和共享结构的代码如下所示。如果StackOverflow允许的话,我也可以尝试按需提供更多的代码。

谢谢你花时间帮助我。

测试代码

private static async Task TestNetworking()
{
EndPoint serverEndPoint = new IPEndPoint(IPAddress.Loopback, 12345);
await Task.Factory.StartNew(async () =>
{
SocketClient client = new UdpClient();
bool bound = client.Bind(new IPEndPoint(IPAddress.Any, 7007));
if (bound)
{
Console.WriteLine($"[Client] Bound client socket!");
}
if (bound && client.Connect(serverEndPoint))
{
Console.WriteLine($"[Client] Connected to {serverEndPoint}!");
byte[] message = Encoding.UTF8.GetBytes("Hello World!");
Stopwatch stopwatch = new Stopwatch();
const int packetsToSend = 1_000_000;
for (int i = 0; i < packetsToSend; i++)
{
try
{
stopwatch.Start();
int sentBytes = await client.SendAsync(serverEndPoint, message, SocketFlags.None);
//Console.WriteLine($"[Client] Sent {sentBytes} to {serverEndPoint}");
ReceiveResult result = await client.ReceiveAsync(serverEndPoint, SocketFlags.None);
//Console.WriteLine($"[{result.RemoteEndPoint} > Client] : {Encoding.UTF8.GetString(result.Contents)}");
serverEndPoint = result.RemoteEndPoint;
stopwatch.Stop();
}
catch (Exception ex)
{
Console.WriteLine(ex);
i--;
await Task.Delay(1);
}
}
double approxBandwidth = (packetsToSend * message.Length) / (1_000_000.0 * (stopwatch.ElapsedMilliseconds / 1000.0));
Console.WriteLine($"Sent {packetsToSend} packets of {message.Length} bytes in {stopwatch.ElapsedMilliseconds:N} milliseconds.");
Console.WriteLine($"Approximate bandwidth: {approxBandwidth} MBps");
}
}, TaskCreationOptions.LongRunning);
await Task.Factory.StartNew(async () =>
{
try
{
SocketServer server = new UdpServer();
bool bound = server.Bind(serverEndPoint);
if (bound)
{
//Console.WriteLine($"[Server] Bound server socket!");
//Console.WriteLine($"Starting server at {serverEndPoint}!");
await server.StartAsync();
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}).Result;
}

共享代码

public readonly struct ReceiveResult
{
public const int PacketSize = 1024;
public readonly Memory<byte> Contents;
public readonly int ReceivedBytes;
public readonly EndPoint RemoteEndPoint;
public ReceiveResult(Memory<byte> contents, int receivedBytes, EndPoint remoteEndPoint)
{
Contents = contents;
ReceivedBytes = receivedBytes;
RemoteEndPoint = remoteEndPoint;
}
}

UDP客户端

public class UdpClient : SocketClient
{
/*
public abstract class SocketClient
{
protected readonly Socket socket;
protected SocketClient(AddressFamily addressFamily, SocketType socketType, ProtocolType protocolType)
{
socket = new Socket(addressFamily, socketType, protocolType);
}
public bool Bind(in EndPoint localEndPoint)
{
try
{
socket.Bind(localEndPoint);
return true;
}
catch (Exception ex)
{
Console.WriteLine(ex);
return false;
}
}
public bool Connect(in EndPoint remoteEndPoint)
{
try
{
socket.Connect(remoteEndPoint);
return true;
}
catch (Exception ex)
{
Console.WriteLine(ex);
return false;
}
}
public abstract Task<ReceiveResult> ReceiveAsync(EndPoint remoteEndPoint, SocketFlags socketFlags);
public abstract Task<int> SendAsync(EndPoint remoteEndPoint, ArraySegment<byte> buffer, SocketFlags socketFlags);
}
*/
/// <inheritdoc />
public UdpClient() : base(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp)
{
}
public override async Task<ReceiveResult> ReceiveAsync(EndPoint remoteEndPoint, SocketFlags socketFlags)
{
byte[] buffer = new byte[ReceiveResult.PacketSize];
SocketReceiveFromResult result =
await socket.ReceiveFromAsync(new ArraySegment<byte>(buffer), socketFlags, remoteEndPoint);
return new ReceiveResult(buffer, result.ReceivedBytes, result.RemoteEndPoint);
/*
SocketAsyncEventArgs args = new SocketAsyncEventArgs();
args.SetBuffer(new byte[ReceiveResult.PacketSize]);
args.SocketFlags = socketFlags;
args.RemoteEndPoint = remoteEndPoint;
SocketTask awaitable = new SocketTask(args);
while (ReceiveResult.PacketSize > args.BytesTransferred)
{
await socket.ReceiveFromAsync(awaitable);
}
return new ReceiveResult(args.MemoryBuffer, args.RemoteEndPoint);
*/
}
public override async Task<int> SendAsync(EndPoint remoteEndPoint, ArraySegment<byte> buffer, SocketFlags socketFlags)
{
return await socket.SendToAsync(buffer.ToArray(), socketFlags, remoteEndPoint);
/*
SocketAsyncEventArgs args = new SocketAsyncEventArgs();
args.SetBuffer(buffer);
args.SocketFlags = socketFlags;
args.RemoteEndPoint = remoteEndPoint;
SocketTask awaitable = new SocketTask(args);
while (buffer.Length > args.BytesTransferred)
{
await socket.SendToAsync(awaitable);
}
return args.BytesTransferred;
*/
}
}

UDP服务器

public class UdpServer : SocketServer
{
/*
public abstract class SocketServer
{
protected readonly Socket socket;
protected SocketServer(AddressFamily addressFamily, SocketType socketType, ProtocolType protocolType)
{
socket = new Socket(addressFamily, socketType, protocolType);
}
public bool Bind(in EndPoint localEndPoint)
{
try
{
socket.Bind(localEndPoint);
return true;
}
catch (Exception ex)
{
Console.WriteLine(ex);
return false;
}
}
public abstract Task StartAsync();
}
*/
private const int MaxPooledObjects = 100;
private readonly ConcurrentDictionary<EndPoint, ConcurrentQueue<byte[]>> clients;
private readonly ArrayPool<byte> receiveBufferPool = ArrayPool<byte>.Create(ReceiveResult.PacketSize, MaxPooledObjects);
private readonly ObjectPool<SocketAsyncEventArgs> receiveSocketAsyncEventArgsPool =
new DefaultObjectPool<SocketAsyncEventArgs>(new DefaultPooledObjectPolicy<SocketAsyncEventArgs>(), MaxPooledObjects);
private readonly ObjectPool<SocketAsyncEventArgs> sendSocketAsyncEventArgsPool =
new DefaultObjectPool<SocketAsyncEventArgs>(new DefaultPooledObjectPolicy<SocketAsyncEventArgs>(), MaxPooledObjects);
private void HandleIOCompleted(object? sender, SocketAsyncEventArgs eventArgs)
{
eventArgs.Completed -= HandleIOCompleted;
bool closed = false;
/*
Original (local) methods in ReceiveAsync and SendAsync,
these were assigned to eventArgs.Completed instead of HandleIOCompleted
=======================================================================
void ReceiveCompletedHandler(object? sender, SocketAsyncEventArgs eventArgs)
{
AsyncReadToken asyncReadToken = (AsyncReadToken)eventArgs.UserToken;
eventArgs.Completed -= ReceiveCompletedHandler;
if (eventArgs.SocketError != SocketError.Success)
{
asyncReadToken.CompletionSource.TrySetException(new SocketException((int)eventArgs.SocketError));
}
else
{
eventArgs.MemoryBuffer.CopyTo(asyncReadToken.OutputBuffer);
asyncReadToken.CompletionSource.TrySetResult(
new ReceiveResult(asyncReadToken.OutputBuffer, eventArgs.BytesTransferred, eventArgs.RemoteEndPoint));
}
receiveBufferPool.Return(asyncReadToken.RentedBuffer);
receiveSocketAsyncEventArgsPool.Return(eventArgs);
}
void SendCompletedHandler(object? sender, SocketAsyncEventArgs eventArgs)
{
AsyncWriteToken asyncWriteToken = (AsyncWriteToken)eventArgs.UserToken;
eventArgs.Completed -= SendCompletedHandler;
if (eventArgs.SocketError != SocketError.Success)
{
asyncWriteToken.CompletionSource.TrySetException(new SocketException((int)eventArgs.SocketError));
}
else
{
asyncWriteToken.CompletionSource.TrySetResult(eventArgs.BytesTransferred);
}
sendSocketAsyncEventArgsPool.Return(eventArgs);
}
*/
switch (eventArgs.LastOperation)
{
case SocketAsyncOperation.SendTo:
AsyncWriteToken asyncWriteToken = (AsyncWriteToken)eventArgs.UserToken;
if (eventArgs.SocketError != SocketError.Success)
{
asyncWriteToken.CompletionSource.TrySetException(new SocketException((int)eventArgs.SocketError));
}
else
{
asyncWriteToken.CompletionSource.TrySetResult(eventArgs.BytesTransferred);
}
sendSocketAsyncEventArgsPool.Return(eventArgs);
break;
case SocketAsyncOperation.ReceiveFrom:
AsyncReadToken asyncReadToken = (AsyncReadToken)eventArgs.UserToken;
if (eventArgs.SocketError != SocketError.Success)
{
asyncReadToken.CompletionSource.TrySetException(new SocketException((int)eventArgs.SocketError));
}
else
{
eventArgs.MemoryBuffer.CopyTo(asyncReadToken.OutputBuffer);
asyncReadToken.CompletionSource.TrySetResult(
new ReceiveResult(asyncReadToken.OutputBuffer, eventArgs.BytesTransferred, eventArgs.RemoteEndPoint));
}
receiveBufferPool.Return(asyncReadToken.RentedBuffer);
receiveSocketAsyncEventArgsPool.Return(eventArgs);
break;
case SocketAsyncOperation.Disconnect:
closed = true;
break;
case SocketAsyncOperation.Accept:
case SocketAsyncOperation.Connect:
case SocketAsyncOperation.None:
break;
}
if (closed)
{
// handle the client closing the connection on tcp servers at some point
}
}
private Task<ReceiveResult> ReceiveAsync(EndPoint remoteEndPoint, SocketFlags socketFlags, Memory<byte> outputBuffer)
{
TaskCompletionSource<ReceiveResult> tcs = new TaskCompletionSource<ReceiveResult>();
byte[] buffer = receiveBufferPool.Rent(ReceiveResult.PacketSize);
Memory<byte> memoryBuffer = new Memory<byte>(buffer);
SocketAsyncEventArgs args = receiveSocketAsyncEventArgsPool.Get();
args.SetBuffer(memoryBuffer);
args.SocketFlags = socketFlags;
args.RemoteEndPoint = remoteEndPoint;
args.UserToken = new AsyncReadToken(buffer, outputBuffer, tcs);
args.Completed += HandleIOCompleted;
if (socket.ReceiveFromAsync(args)) return tcs.Task;
byte[] bufferCopy = new byte[ReceiveResult.PacketSize];
args.MemoryBuffer.CopyTo(bufferCopy);
ReceiveResult result = new ReceiveResult(bufferCopy, args.BytesTransferred, args.RemoteEndPoint);
receiveBufferPool.Return(buffer);
receiveSocketAsyncEventArgsPool.Return(args);
return Task.FromResult(result);
}
private Task<int> SendAsync(EndPoint remoteEndPoint, Memory<byte> buffer, SocketFlags socketFlags)
{
TaskCompletionSource<int> tcs = new TaskCompletionSource<int>();
SocketAsyncEventArgs args = sendSocketAsyncEventArgsPool.Get();
args.SetBuffer(buffer);
args.SocketFlags = socketFlags;
args.RemoteEndPoint = remoteEndPoint;
args.UserToken = new AsyncWriteToken(buffer, tcs);
args.Completed += HandleIOCompleted;
if (socket.SendToAsync(args)) return tcs.Task;
int result = args.BytesTransferred;
sendSocketAsyncEventArgsPool.Return(args);
return Task.FromResult(result);
}
private readonly struct AsyncReadToken
{
public readonly TaskCompletionSource<ReceiveResult> CompletionSource;
public readonly Memory<byte> OutputBuffer;
public readonly byte[] RentedBuffer;
public AsyncReadToken(byte[] rentedBuffer, Memory<byte> outputBuffer, TaskCompletionSource<ReceiveResult> tcs)
{
RentedBuffer = rentedBuffer;
OutputBuffer = outputBuffer;
CompletionSource = tcs;
}
}
private readonly struct AsyncWriteToken
{
public readonly TaskCompletionSource<int> CompletionSource;
public readonly Memory<byte> OutputBuffer;
public AsyncWriteToken(Memory<byte> outputBuffer, TaskCompletionSource<int> tcs)
{
OutputBuffer = outputBuffer;
CompletionSource = tcs;
}
}
public UdpServer() : base(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp)
{
clients = new ConcurrentDictionary<EndPoint, ConcurrentQueue<byte[]>>();
}
/// <inheritdoc />
public override async Task StartAsync()
{
EndPoint nullEndPoint = new IPEndPoint(IPAddress.Any, 0);
byte[] receiveBuffer = new byte[ReceiveResult.PacketSize];
Memory<byte> receiveBufferMemory = new Memory<byte>(receiveBuffer);
while (true)
{
ReceiveResult result = await ReceiveAsync(nullEndPoint, SocketFlags.None, receiveBufferMemory);
Console.WriteLine($"[{result.RemoteEndPoint} > Server] : {Encoding.UTF8.GetString(result.Contents.Span)}");
int sentBytes = await SendAsync(result.RemoteEndPoint, result.Contents, SocketFlags.None);
Console.WriteLine($"[Server > {result.RemoteEndPoint}] Sent {sentBytes} bytes to {result.RemoteEndPoint}");
}
}
}

我设法解决了这个问题

我最终不得不合并SocketAsyncEventArgs池,因为事实证明,在接收和发送调用期间,您需要保留一个args对象。现在,我的SendToAsync函数获取一个SocketAsyncEventArgs对象(在ReceiveFromAsync调用中租用(,该对象包含要向其发送响应的客户端的RemoteEndPoint。SendToAsync函数用于清理SocketAsyncEventArgs,并将它们返回到池中。

我早期解决方案的另一个问题是多次分配事件。当我合并两个套接字args池时,我留下了多事件处理程序分配,这最终导致了问题。一旦解决了这个问题,该解决方案就完全按照预期工作,可以毫无问题地发送1000 000个数据包(1Kb(。真正的早期测试(可能有点偏离(显示了大约每秒5兆字节(大约每秒40兆比特(的带宽,这是可以接受的,而且比我自己的代码过于复杂的"快速异步"版本要好得多。

关于带宽,我的快速异步版本过于复杂,因此不具有可比性,但我相信这个SocketAsyncEventArgs版本可以作为基准测试和修补的一个很好的起点,以尽可能多地从套接字中挤出性能。不过,我仍然希望得到有关这方面的反馈,并可能在某个时候将其发布到代码审查堆栈交换中,因为我怀疑解决方案中是否仍然存在细微的错误。

无论谁想使用这个代码,都可以自由使用,它最终比预期的要简单得多,也更容易创建,但如果你愚蠢到在没有经过广泛测试的情况下在生产中使用它,我不承担任何责任(毕竟这是一个学习项目(。


测试代码:

private static async Task TestNetworking()
{
EndPoint serverEndPoint = new IPEndPoint(IPAddress.Loopback, 12345);
await Task.Factory.StartNew(async () =>
{
try
{
SocketServer server = new UdpServer();
bool bound = server.Bind(serverEndPoint);
if (bound)
{
Console.WriteLine($"[Server] Bound server socket!");
Console.WriteLine($"[Server] Starting server at {serverEndPoint}!");
await server.StartAsync();
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
await Task.Factory.StartNew(async () =>
{
SocketClient client = new UdpClient();
bool bound = client.Bind(new IPEndPoint(IPAddress.Any, 7007));
if (bound)
{
Console.WriteLine($"[Client] Bound client socket!");
}
if (bound && client.Connect(serverEndPoint))
{
Console.WriteLine($"[Client] Connected to {serverEndPoint}!");
byte[] message = Encoding.UTF8.GetBytes("Hello World!");
Memory<byte> messageBuffer = new Memory<byte>(message);
byte[] response = new byte[ReceiveResult.PacketSize];
Memory<byte> responseBuffer = new Memory<byte>(response);
Stopwatch stopwatch = new Stopwatch();
const int packetsToSend = 1_000_000, statusPacketThreshold = 10_000;
Console.WriteLine($"Started sending packets (total packet count: {packetsToSend})");
for (int i = 0; i < packetsToSend; i++)
{
if (i % statusPacketThreshold == 0)
{
Console.WriteLine($"Sent {i} packets out of {packetsToSend} ({((double)i / packetsToSend) * 100:F2}%)");
}
try
{
//Console.WriteLine($"[Client > {serverEndPoint}] Sending packet {i}");
stopwatch.Start();
int sentBytes = await client.SendAsync(serverEndPoint, messageBuffer, SocketFlags.None);
//Console.WriteLine($"[Client] Sent {sentBytes} to {serverEndPoint}");
ReceiveResult result = await client.ReceiveAsync(serverEndPoint, SocketFlags.None, responseBuffer);
//Console.WriteLine($"[{result.RemoteEndPoint} > Client] : {Encoding.UTF8.GetString(result.Contents)}");
serverEndPoint = result.RemoteEndPoint;
stopwatch.Stop();
}
catch (Exception ex)
{
Console.WriteLine(ex);
i--;
await Task.Delay(1);
}
}
double approxBandwidth = (packetsToSend * ReceiveResult.PacketSize) / (1_000_000.0 * (stopwatch.ElapsedMilliseconds / 1000.0));
Console.WriteLine($"Sent {packetsToSend} packets of {ReceiveResult.PacketSize} bytes in {stopwatch.ElapsedMilliseconds:N} milliseconds.");
Console.WriteLine($"Approximate bandwidth: {approxBandwidth} MBps");
}
}, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default).Result;
}

共享代码:

internal readonly struct AsyncReadToken
{
public readonly CancellationToken CancellationToken;
public readonly TaskCompletionSource<ReceiveResult> CompletionSource;
public readonly byte[] RentedBuffer;
public readonly Memory<byte> UserBuffer;
public AsyncReadToken(byte[] rentedBuffer, Memory<byte> userBuffer, TaskCompletionSource<ReceiveResult> tcs,
CancellationToken cancellationToken = default)
{
RentedBuffer = rentedBuffer;
UserBuffer = userBuffer;
CompletionSource = tcs;
CancellationToken = cancellationToken;
}
}
internal readonly struct AsyncWriteToken
{
public readonly CancellationToken CancellationToken;
public readonly TaskCompletionSource<int> CompletionSource;
public readonly byte[] RentedBuffer;
public readonly Memory<byte> UserBuffer;
public AsyncWriteToken(byte[] rentedBuffer, Memory<byte> userBuffer, TaskCompletionSource<int> tcs,
CancellationToken cancellationToken = default)
{
RentedBuffer = rentedBuffer;
UserBuffer = userBuffer;
CompletionSource = tcs;
CancellationToken = cancellationToken;
}
}
public readonly struct ReceiveResult
{
public const int PacketSize = 1024;
public readonly SocketAsyncEventArgs ClientArgs;
public readonly Memory<byte> Contents;
public readonly int Count;
public readonly EndPoint RemoteEndPoint;
public ReceiveResult(SocketAsyncEventArgs clientArgs, Memory<byte> contents, int count, EndPoint remoteEndPoint)
{
ClientArgs = clientArgs;
Contents = contents;
Count = count;
RemoteEndPoint = remoteEndPoint;
}
}

服务器代码:

public abstract class SocketServer
{
protected readonly Socket socket;
protected SocketServer(AddressFamily addressFamily, SocketType socketType, ProtocolType protocolType)
{
socket = new Socket(addressFamily, socketType, protocolType);
}
public bool Bind(in EndPoint localEndPoint)
{
try
{
socket.Bind(localEndPoint);
return true;
}
catch (Exception ex)
{
Console.WriteLine(ex);
return false;
}
}
public abstract Task StartAsync();
}
public class UdpServer : SocketServer
{
private const int MaxPooledObjects = 1;
private readonly ConcurrentDictionary<EndPoint, ConcurrentQueue<byte[]>> clients;
private readonly ArrayPool<byte> receiveBufferPool =
ArrayPool<byte>.Create(ReceiveResult.PacketSize, MaxPooledObjects);
private readonly ArrayPool<byte> sendBufferPool =
ArrayPool<byte>.Create(ReceiveResult.PacketSize, MaxPooledObjects);
private readonly ObjectPool<SocketAsyncEventArgs> socketAsyncEventArgsPool =
new DefaultObjectPool<SocketAsyncEventArgs>(new DefaultPooledObjectPolicy<SocketAsyncEventArgs>(),
MaxPooledObjects);
private void HandleIOCompleted(object? sender, SocketAsyncEventArgs eventArgs)
{
bool closed = false;
switch (eventArgs.LastOperation)
{
case SocketAsyncOperation.SendTo:
AsyncWriteToken asyncWriteToken = (AsyncWriteToken)eventArgs.UserToken;
if (asyncWriteToken.CancellationToken.IsCancellationRequested)
{
asyncWriteToken.CompletionSource.TrySetCanceled();
}
else
{
if (eventArgs.SocketError != SocketError.Success)
{
asyncWriteToken.CompletionSource.TrySetException(
new SocketException((int)eventArgs.SocketError));
}
else
{
asyncWriteToken.CompletionSource.TrySetResult(eventArgs.BytesTransferred);
}
}
sendBufferPool.Return(asyncWriteToken.RentedBuffer, true);
socketAsyncEventArgsPool.Return(eventArgs);
break;
case SocketAsyncOperation.ReceiveFrom:
AsyncReadToken asyncReadToken = (AsyncReadToken)eventArgs.UserToken;
if (asyncReadToken.CancellationToken.IsCancellationRequested)
{
asyncReadToken.CompletionSource.SetCanceled();
}
else
{
if (eventArgs.SocketError != SocketError.Success)
{
asyncReadToken.CompletionSource.SetException(
new SocketException((int)eventArgs.SocketError));
}
else
{
eventArgs.MemoryBuffer.CopyTo(asyncReadToken.UserBuffer);
ReceiveResult result = new ReceiveResult(eventArgs, asyncReadToken.UserBuffer,
eventArgs.BytesTransferred, eventArgs.RemoteEndPoint);
asyncReadToken.CompletionSource.SetResult(result);
}
}
receiveBufferPool.Return(asyncReadToken.RentedBuffer, true);
break;
case SocketAsyncOperation.Disconnect:
closed = true;
break;
case SocketAsyncOperation.Accept:
case SocketAsyncOperation.Connect:
case SocketAsyncOperation.None:
case SocketAsyncOperation.Receive:
case SocketAsyncOperation.ReceiveMessageFrom:
case SocketAsyncOperation.Send:
case SocketAsyncOperation.SendPackets:
throw new NotImplementedException();
default:
throw new ArgumentOutOfRangeException();
}
if (closed)
{
// handle the client closing the connection on tcp servers at some point
}
}
private Task<ReceiveResult> ReceiveAsync(EndPoint remoteEndPoint, SocketFlags socketFlags,
Memory<byte> outputBuffer, CancellationToken cancellationToken = default)
{
TaskCompletionSource<ReceiveResult> tcs = new TaskCompletionSource<ReceiveResult>();
byte[] rentedBuffer = receiveBufferPool.Rent(ReceiveResult.PacketSize);
Memory<byte> memoryBuffer = new Memory<byte>(rentedBuffer);
SocketAsyncEventArgs args = socketAsyncEventArgsPool.Get();
args.SetBuffer(memoryBuffer);
args.SocketFlags = socketFlags;
args.RemoteEndPoint = remoteEndPoint;
args.UserToken = new AsyncReadToken(rentedBuffer, outputBuffer, tcs, cancellationToken);
// if the receive operation doesn't complete synchronously, returns the awaitable task
if (socket.ReceiveFromAsync(args)) return tcs.Task;
args.MemoryBuffer.CopyTo(outputBuffer);
ReceiveResult result = new ReceiveResult(args, outputBuffer, args.BytesTransferred, args.RemoteEndPoint);
receiveBufferPool.Return(rentedBuffer, true);
return Task.FromResult(result);
}
private Task<int> SendAsync(SocketAsyncEventArgs clientArgs, Memory<byte> inputBuffer, SocketFlags socketFlags,
CancellationToken cancellationToken = default)
{
TaskCompletionSource<int> tcs = new TaskCompletionSource<int>();
byte[] rentedBuffer = sendBufferPool.Rent(ReceiveResult.PacketSize);
Memory<byte> memoryBuffer = new Memory<byte>(rentedBuffer);
inputBuffer.CopyTo(memoryBuffer);
SocketAsyncEventArgs args = clientArgs;
args.SetBuffer(memoryBuffer);
args.SocketFlags = socketFlags;
args.UserToken = new AsyncWriteToken(rentedBuffer, inputBuffer, tcs, cancellationToken);
// if the send operation doesn't complete synchronously, return the awaitable task
if (socket.SendToAsync(args)) return tcs.Task;
int result = args.BytesTransferred;
sendBufferPool.Return(rentedBuffer, true);
socketAsyncEventArgsPool.Return(args);
return Task.FromResult(result);
}
public UdpServer() : base(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp)
{
clients = new ConcurrentDictionary<EndPoint, ConcurrentQueue<byte[]>>();
for (int i = 0; i < MaxPooledObjects; i++)
{
SocketAsyncEventArgs args = new SocketAsyncEventArgs();
args.Completed += HandleIOCompleted;
socketAsyncEventArgsPool.Return(args);
}
}
/// <inheritdoc />
public override async Task StartAsync()
{
EndPoint nullEndPoint = new IPEndPoint(IPAddress.Any, 0);
byte[] receiveBuffer = new byte[ReceiveResult.PacketSize];
Memory<byte> receiveBufferMemory = new Memory<byte>(receiveBuffer);
while (true)
{
ReceiveResult result = await ReceiveAsync(nullEndPoint, SocketFlags.None, receiveBufferMemory);
//Console.WriteLine($"[{result.RemoteEndPoint} > Server] : {Encoding.UTF8.GetString(result.Contents.Span)}");
int sentBytes = await SendAsync(result.ClientArgs, result.Contents, SocketFlags.None);
//Console.WriteLine($"[Server > {result.RemoteEndPoint}] Sent {sentBytes} bytes to {result.RemoteEndPoint}");
}
}

客户代码:

public abstract class SocketClient
{
protected readonly Socket socket;
protected SocketClient(AddressFamily addressFamily, SocketType socketType, ProtocolType protocolType)
{
socket = new Socket(addressFamily, socketType, protocolType);
}
public bool Bind(in EndPoint localEndPoint)
{
try
{
socket.Bind(localEndPoint);
return true;
}
catch (Exception ex)
{
Console.WriteLine(ex);
return false;
}
}
public bool Connect(in EndPoint remoteEndPoint)
{
try
{
socket.Connect(remoteEndPoint);
return true;
}
catch (Exception ex)
{
Console.WriteLine(ex);
return false;
}
}
public abstract Task<ReceiveResult> ReceiveAsync(EndPoint remoteEndPoint, SocketFlags socketFlags,
Memory<byte> outputBuffer);
public abstract Task<int> SendAsync(EndPoint remoteEndPoint, Memory<byte> inputBuffer, SocketFlags socketFlags);
}
public class UdpClient : SocketClient
{
/// <inheritdoc />
public UdpClient() : base(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp)
{
}
public override async Task<ReceiveResult> ReceiveAsync(EndPoint remoteEndPoint, SocketFlags socketFlags,
Memory<byte> outputBuffer)
{
byte[] buffer = new byte[ReceiveResult.PacketSize];
SocketReceiveFromResult result =
await socket.ReceiveFromAsync(new ArraySegment<byte>(buffer), socketFlags, remoteEndPoint);
buffer.CopyTo(outputBuffer);
return new ReceiveResult(default, outputBuffer, result.ReceivedBytes, result.RemoteEndPoint);
/*
SocketAsyncEventArgs args = new SocketAsyncEventArgs();
args.SetBuffer(new byte[ReceiveResult.PacketSize]);
args.SocketFlags = socketFlags;
args.RemoteEndPoint = remoteEndPoint;
SocketTask awaitable = new SocketTask(args);
while (ReceiveResult.PacketSize > args.BytesTransferred)
{
await socket.ReceiveFromAsync(awaitable);
}
return new ReceiveResult(args.MemoryBuffer, args.RemoteEndPoint);
*/
}
public override async Task<int> SendAsync(EndPoint remoteEndPoint, Memory<byte> buffer, SocketFlags socketFlags)
{
return await socket.SendToAsync(buffer.ToArray(), socketFlags, remoteEndPoint);
/*
SocketAsyncEventArgs args = new SocketAsyncEventArgs();
args.SetBuffer(buffer);
args.SocketFlags = socketFlags;
args.RemoteEndPoint = remoteEndPoint;
SocketTask awaitable = new SocketTask(args);
while (buffer.Length > args.BytesTransferred)
{
await socket.SendToAsync(awaitable);
}
return args.BytesTransferred;
*/
}
}

最新更新