如何使用Kestrel ConnectionHandler处理传入的TCP消息



我想为我的.NET Core项目创建一个TCP侦听器。我正在使用Kestrel,并通过为此配置了一个新的ConnectionHandler

kestrelServerOptions.ListenLocalhost(5000, builder =>
{
builder.UseConnectionHandler<MyTCPConnectionHandler>();
});

到目前为止,我拥有的是

internal class MyTCPConnectionHandler : ConnectionHandler
{
public override async Task OnConnectedAsync(ConnectionContext connection)
{
IDuplexPipe pipe = connection.Transport;
PipeReader pipeReader = pipe.Input;
while (true)
{
ReadResult readResult = await pipeReader.ReadAsync();
ReadOnlySequence<byte> readResultBuffer = readResult.Buffer;
foreach (ReadOnlyMemory<byte> segment in readResultBuffer)
{
// read the current message
string messageSegment = Encoding.UTF8.GetString(segment.Span);
// send back an echo
await pipe.Output.WriteAsync(segment);
}
if (readResult.IsCompleted)
{
break;
}
pipeReader.AdvanceTo(readResultBuffer.Start, readResultBuffer.End);
}
}
}

当从TCP客户端向服务器应用程序发送消息时,代码运行良好。线路await pipe.Output.WriteAsync(segment);现在的作用就像回声。

出现了一些问题

  • 我应该向客户端发回什么响应,这样它就不会超时
  • 我应该什么时候发回回复?readResult.IsCompleted何时返回true
  • 我应该如何更改代码以获取客户端发送的全部消息?我应该将每个messageSegment存储在List<string>中,并在readResult.IsCompleted返回true时将其连接到单个字符串中吗
完全依赖于协议的
  1. ;在很多情况下,你什么都不做是可以的;在其他情况下;ping"/"pong";如果你只想说";我还在这里">
  2. ";当";完全依赖于协议;等待readResult.IsCompleted意味着您正在等待入站套接字被标记为关闭,这意味着您在客户端关闭其出站套接字之前不会发送任何内容;对于单次协议,这可能没问题;但在大多数情况下,您需要查找单个入站帧,并回复该帧(然后重复(
  3. 听起来你可能确实在写一个一次性通道,即客户端只向服务器发送一件事,然后:服务器只向客户端发送一件事情在这种情况下,您会执行以下操作:
while (true)
{
var readResult = await pipeReader.ReadAsync();
if (readResult.IsCompleted)
{
// TODO: not shown; process readResult.Buffer
// tell the pipe that we consumed everything, and exit
pipeReader.AdvanceTo(readResultBuffer.End, readResultBuffer.End);
break;
}
else
{
// wait for the client to close their outbound; tell
// the pipe that we couldn't consume anything
pipeReader.AdvanceTo(readResultBuffer.Start, readResultBuffer.End);
}

关于:

当时,我应该将每个messageSegment存储在List<string>中并将其连接到单个字符串中吗

这里首先要考虑的是,不一定每个缓冲区段都包含确切数量的字符。由于您使用的是UTF-8,这是一种多字节编码,因此一个段的开头和结尾可能包含部分字符,因此:解码它要比这更复杂一些。

因此,在缓冲器上检查IsSingleSegment是很常见的;如果这是真的,你可以使用简单的代码:

if (buffer.IsSingleSegment)
{
string message = Encoding.UTF8.GetString(s.FirstSpan);
DoSomethingWith(message);
}
else
{
// ... more complex
}

缓冲区不连续的情况要困难得多;基本上,你有两个选择:

  1. 将段线性化为一个连续的缓冲区,可能从ArrayPool<byte>.Shared租赁了一个过大的缓冲区并在租赁缓冲区的正确部分上使用UTF8.GetString
  2. 在编码上使用GetDecoder()API,并使用它来填充新字符串,这在较旧的框架中意味着覆盖新分配的字符串,或者在较新的框架中则意味着使用string.CreateAPI

坦率地说;1〃;要简单得多。例如(未经测试(:

public static string GetString(in this ReadOnlySequence<byte> payload,
Encoding encoding = null)
{
encoding ??= Encoding.UTF8;
return payload.IsSingleSegment ? encoding.GetString(payload.FirstSpan)
: GetStringSlow(payload, encoding);
static string GetStringSlow(in ReadOnlySequence<byte> payload, Encoding encoding)
{
// linearize
int length = checked((int)payload.Length);
var oversized = ArrayPool<byte>.Shared.Rent(length);
try
{
payload.CopyTo(oversized);
return encoding.GetString(oversized, 0, length);
}
finally
{
ArrayPool<byte>.Shared.Return(oversized);
}
}
}

最新更新