SocketAsyncEventArgs 发送/接收订单



我最近一直在为一个项目使用 SocketAsyncEventArgs,我遇到了一些问题,即 ReceiveAsync 偶尔会以与通过 SendAsync 发送的顺序不同的顺序获取数据。在 SendAsync 方法中发送的每个数据块都会得到维护,但这些块的顺序不一定正确。也许我对SendAsync方法的理解不正确,但我认为特别是使用SocketType.Stream和ProtocolType.Tcp可以确保保持顺序。我知道底层进程将不可避免地分解消息,并且 ReceiveAsync 通常读取的读数少于缓冲区分配。但我假设发送和接收流会保持秩序。

我开发了一个测试控制台程序来显示问题。它每次尝试使用一组不同的套接字和端口运行大约 20 次。在我的笔记本电脑上,它通常会通过一次,然后第二次失败;通常在期待第二个块时收到较晚的块。从其他测试中,我知道预期的块最终确实会出现,只是顺序不对

。需要注意的是,我能够在Windows 2008远程服务器上对其进行测试,并且没有任何问题。但是,它从未接近于在我的笔记本电脑上完成。事实上,如果我让调试执行在异常中断中挂起一段时间,我已经不止一次完全冻结我的笔记本电脑,并且不得不进行硬重启。这是我在Windows 7上运行的工作笔记本电脑,使用VS2017。我不确定这是否可能是一个因素,但它正在运行赛门铁克端点保护,尽管我在日志中没有找到任何内容。

所以我的问题是,我对SocketAsyncEventArgs的运作方式有不正确的看法吗?还是我的代码是灾难(也许两者兼而有之(?它是我的笔记本电脑独有的吗?(最后一个让我觉得我正在设置尴尬,就像你刚开始编程并且你认为编译器一定有问题一样。

using System;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading.Tasks;
static class DumTest
{
static void Main(string[] args)
{
for (int i = 9177; i < 9199; i++)
{
RunDum(i);
//Thread.Sleep(350);
}
Console.WriteLine("all done.");
Console.ReadLine();
}
static void RunDum(int port)
{
var dr = new DumReceiver(port);
var ds = new DumSender(port);
dr.Acception.Wait();
ds.Connection.Wait();
dr.Completion.Wait();
ds.Completion.Wait();
Console.WriteLine($"Completed {port}. " +
$"sent: {ds.SegmentsSent} segments, received: {dr.SegmentsRead} segments");
}
}
class DumReceiver
{
private readonly SocketAsyncEventArgs eva = new SocketAsyncEventArgs();
private readonly TaskCompletionSource<object> tcsAcc = new TaskCompletionSource<object>();
private TaskCompletionSource<object> tcsRcv;
private Socket socket;
internal DumReceiver(int port)
{
this.eva.Completed += this.Received;
var lstSock = new Socket(
AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
var localIP = Dns.GetHostEntry(Dns.GetHostName()).AddressList
.First(i => i.AddressFamily == AddressFamily.InterNetwork);
lstSock.Bind(new IPEndPoint(localIP, port));
lstSock.Listen(1);
var saea = new SocketAsyncEventArgs();
saea.Completed += this.AcceptCompleted;
lstSock.AcceptAsync(saea);
}
internal Task Acception => this.tcsAcc.Task;
internal Task Completion { get; private set; }
internal int SegmentsRead { get; private set; }
private void AcceptCompleted(object sender, SocketAsyncEventArgs e)
{
if (e.SocketError == SocketError.Success)
{
this.socket = e.AcceptSocket;
e.Dispose();
try
{
this.Completion = this.ReceiveLupeAsync();
}
finally
{
this.tcsAcc.SetResult(null);
}
}
else
{
this.tcsAcc.SetException(new SocketException((int)e.SocketError));
}
}
private async Task ReceiveLupeAsync()
{
var buf = new byte[8196];
byte bufSeg = 1;
int pos = 0;
while (true)
{
this.tcsRcv = new TaskCompletionSource<object>();
this.eva.SetBuffer(buf, pos, 8196 - pos);
if (this.socket.ReceiveAsync(this.eva))
{
await this.tcsRcv.Task.ConfigureAwait(false);
}
if (this.eva.SocketError != SocketError.Success)
{
throw new SocketException((int)eva.SocketError);
}
if (this.eva.BytesTransferred == 0)
{
if (pos != 0)
{
throw new EndOfStreamException();
}
break;
}
pos += this.eva.BytesTransferred;
if (pos == 8196)
{
pos = 0;
for (int i = 0; i < 8196; i++)
{
if (buf[i] != bufSeg)
{
var msg = $"Expected {bufSeg} but read {buf[i]} ({i} of 8196). " +
$"Last read: {this.eva.BytesTransferred}.";
Console.WriteLine(msg);
throw new Exception(msg);
}
}
this.SegmentsRead++;
bufSeg = (byte)(this.SegmentsRead + 1);
}
}
}
private void Received(object s, SocketAsyncEventArgs e) => this.tcsRcv.SetResult(null);
}
class DumSender
{
private readonly SocketAsyncEventArgs eva = new SocketAsyncEventArgs();
private readonly Socket socket = new Socket(
AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
private readonly TaskCompletionSource<object> tcsCon = new TaskCompletionSource<object>();
private TaskCompletionSource<object> tcsSnd;
internal DumSender(int port)
{
this.eva.Completed += this.Sent;
var saea = new SocketAsyncEventArgs();
var localIP = Dns.GetHostEntry(Dns.GetHostName()).AddressList
.First(i => i.AddressFamily == AddressFamily.InterNetwork);
saea.RemoteEndPoint = new IPEndPoint(localIP, port);
saea.Completed += this.ConnectionCompleted;
this.socket.ConnectAsync(saea);
}
internal Task Connection => this.tcsCon.Task;
internal Task Completion { get; private set; }
internal int SegmentsSent { get; private set; }
private void ConnectionCompleted(object sender, SocketAsyncEventArgs e)
{
if (e.SocketError == SocketError.Success)
{
e.Dispose();
try
{
this.Completion = this.SendLupeAsync();
}
finally
{
this.tcsCon.SetResult(null);
}
}
else
{
this.tcsCon.SetException(new SocketException((int)e.SocketError));
}
}
private async Task SendLupeAsync()
{
var buf = new byte[8196];
byte bufSeg = 1;
while (true)
{
for (int i = 0; i < 8196; i++)
{
buf[i] = bufSeg;
}
this.tcsSnd = new TaskCompletionSource<object>();
this.eva.SetBuffer(buf, 0, 8196);
if (this.socket.SendAsync(this.eva))
{
await this.tcsSnd.Task.ConfigureAwait(false);
}
if (this.eva.SocketError != SocketError.Success)
{
throw new SocketException((int)this.eva.SocketError);
}
if (this.eva.BytesTransferred != 8196)
{
throw new SocketException();
}
if (++this.SegmentsSent == 299)
{
break;
}
bufSeg = (byte)(this.SegmentsSent + 1);
}
this.socket.Shutdown(SocketShutdown.Both);
}
private void Sent(object s, SocketAsyncEventArgs e) => this.tcsSnd.SetResult(null);
}

我相信问题出在您的代码中。

您必须检查使用SocketAsyncEventArgsSocket*Async方法的返回。 如果它们返回false,则不会引发SocketAsyncEventArgs.Completed事件,您必须同步处理结果。

参考文档:SocketAsyncEventArgs Class。 搜索willRaiseEvent

DumReceiver的构造函数中,您不检查AcceptAsync的结果,也不会在同步完成时处理这种情况。

DumSender的构造函数中,您不检查ConnectAsync的结果,也不会在同步完成时处理这种情况。

最重要的是,SocketAsyncEventArgs.Completed事件可能会在其他线程中引发,很可能是来自ThreadPool的 I/O 线程。

每次在没有正确同步的情况下分配给DumReceiver.tcsRcvDumSender.tcsSnd时,都无法确定DumReceiver.ReceivedDumSender.Sent使用的是最新的TaskCompletionSource

实际上,您可以在第一次迭代时获得NullReferenceException

您缺少以下方面的同步:

  • DumReceiver,字段tcsRcvsocket以及属性CompletionSegmentsRead

  • DumSender,字段tcsSnd和属性CompletionSegmentsSent

我建议您考虑使用单个SemaphoreSlim,而不是在每次调用ReceiveAsyncSendAsync时都创建新的TaskCompletionSource。 您需要在构造函数中将信号量初始化为 0。 如果*Async操作挂起,则await WaitAsync信号量,Completed事件将Release信号量。

这应该足以摆脱TaskCompletionSource领域的竞争条件。 您仍然需要对其他字段和属性进行适当的同步。 例如,没有理由不能在构造函数中创建Completion,并且SegmentsReadSegmentsSent可以是只读的,并引用可以使用一个或多个Interlocked方法在内部访问的字段(例如Interlocked.IncrementInterlocked.Add(。

相关内容

  • 没有找到相关文章

最新更新