根据模式拆分流的最快方法

  • 本文关键字:方法 分流 模式 拆分 c#
  • 更新时间 :
  • 英文 :


将Steam拆分为由字节模式(例如new byte[] { 0, 0 }(分隔的块的最佳/最快方法是什么?

我目前的实现速度很慢,每字节读取一个流字节,每次遇到分隔符时递减一个计数器。如果计数器为零,则会产生一个内存块。

const int NUMBER_CONSECUTIVE_DELIMITER = 2;
const int DELIMITER = 0;
public IEnumerable<ReadOnlyMemory<byte>> Chunk(Stream stream)
{
var chunk = new MemoryStream();
try
{
int b; //the byte being read
int c = NUMBER_CONSECUTIVE_DELIMITER;
while ((b = stream.ReadByte()) != -1) //Read the stream byte by byte, -1 = end of the stream
{
chunk.WriteByte((byte)b); //Write this byte to the next chunk
if (b == DELIMITER) 
c--; //if we hit the delimiter (ie '0') decrement the counter
else
c = NUMBER_CONSECUTIVE_DELIMITER; //else, reset the couter
if ((c <= 0 || stream.Position == stream.Length) //we hit two subsequent '0's
{
var r = chunk.ToArray().AsMemory(); //parse it to a Memory<T>
chunk.Dispose();
chunk = new();
yield return r;
}
}
}
finally
{
chunk.Dispose();
}
}

这样的实现极其困难,因为流必须以固定的缓冲区大小读出。缓冲区可能太大或太小,无法解释内容。为了解决这个问题,添加了ReadOnlySequence<T>结构。有关此主题的更多信息,请参阅此处。

通过使用System.IO.Pipelines(必须获得包(,这个问题可以如下解决:

public static async Task FillPipeAsync(Stream stream, PipeWriter writer, CancellationToken cancellationToken = default)
{
// The minimum buffer size that is used for the current buffer segment.
const int bufferSize = 65536;
while (true)
{
// Request 65536 bytes from the PipeWriter.
Memory<byte> memory = writer.GetMemory(bufferSize);
// Read the content from the stream.
int bytesRead = await stream.ReadAsync(memory, cancellationToken).ConfigureAwait(false);
if (bytesRead == 0) break;
// Tell the writer how many bytes are read.
writer.Advance(bytesRead);
// Flush the data to the PipeWriter.
FlushResult result = await writer.FlushAsync(cancellationToken).ConfigureAwait(false);
if (result.IsCompleted) break;
}
// This enables our reading process to be notified that no more new data is coming.
await writer.CompleteAsync().ConfigureAwait(false);
}

这将异步读取流,并将缓冲区段写入管道。接下来,您必须实现一个读取逻辑,将连接的缓冲区分段分割/合并为块:

public static async IAsyncEnumerable<ReadOnlySequence<byte>> ReadPipeAsync(PipeReader reader, ReadOnlyMemory<byte> delimiter,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
while (true)
{
// Read from the PipeReader.
ReadResult result = await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadChunk(ref buffer, delimiter.Span, out ReadOnlySequence<byte> chunk))
yield return chunk;
// Tell the PipeReader how many bytes are read.
// This is essential because the Pipe will release last used buffer segments that are not longer in use.
reader.AdvanceTo(buffer.Start, buffer.End);
// Take care of the complete notification and return the last buffer. UPDATE: Corrected issue 2/.
if (result.IsCompleted)
{
yield return buffer;
break;
}
}
await reader.CompleteAsync().ConfigureAwait(false);
}
private static bool TryReadChunk(ref ReadOnlySequence<byte> buffer, ReadOnlySpan<byte> delimiter,
out ReadOnlySequence<byte> chunk)
{
// Search the buffer for the first byte of the delimiter.
SequencePosition? position = buffer.PositionOf(delimiter[0]);
// If no occurence was found or the next bytes of the data in the buffer does not match the delimiter, return false.
// UPDATE: Corrected issue 3/.
if (position is null || !buffer.Slice(position.Value, delimiter.Length).FirstSpan.StartsWith(delimiter))
{
chunk = default;
return false;
}
// Return the calculated chunk and update the buffer to cut the start.
chunk = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(delimiter.Length, position.Value));
return true;
}

为了在这种形式下工作,您必须使用IAsyncEnumerable,以便块可以流式传输到foreach循环中。合并和切片在很大程度上由管道处理,因此可以在这里用相对较少的代码构建可靠的算法。此代码还将以高性能的方式处理此问题。

用法

// Create a Pipe that manages the buffer.
Pipe pipe = new Pipe();
ConfiguredTaskAwaitable writing = FillPipeAsync(stream, pipe.Writer).ConfigureAwait(false);
// The delimiter that should be used. This can be any data with length > 0.
ReadOnlyMemory<byte> delimiter = new ReadOnlyMemory<byte>(new byte[] { 0, 0 });
// 'await foreach' and 'await writing' are executed asynchronously (in parallel).
await foreach (ReadOnlySequence<byte> chunk in ReadPipeAsync(pipe.Reader, delimiter))
{
// Use "chunk" to retrieve your chunked content.
};
await writing;

请注意,读取和分块是异步和独立完成的。

我最终得到了下面的代码,受到了Philipp上面和https://keestalkstech.com/2010/11/seek-position-of-a-string-in-a-file-or-filestream/.

public override IEnumerable<byte[]> Chunk(Stream stream)
{
var buffer = new byte[bufferSize];
var size = bufferSize;
var offset = 0;
var position = stream.Position;
var nextChunk = Array.Empty<byte>();
while (true)
{
var bytesRead = stream.Read(buffer, offset, size);
// when no bytes are read -- the string could not be found
if (bytesRead <= 0)
break;
// when less then size bytes are read, we need to slice the buffer to prevent reading of "previous" bytes
ReadOnlySpan<byte> ro = buffer;
if (bytesRead < size)
ro = ro.Slice(0, offset + bytesRead);
// check if we can find our search bytes in the buffer
var i = ro.IndexOf(Delimiter);
if (i > -1 &&  // we found something
i <= bytesRead &&  //i <= r  -- we found something in the area that was read (at the end of the buffer, the last values are not overwritten). i = r if the delimiter is at the end of the buffer
nextChunk.Length + (i + Delimiter.Length - offset) >= MinChunkSize)  //the size of the chunk that will be made is large enough
{
var chunk = buffer[offset..(i + Delimiter.Length)];
yield return new byte[](Concat(nextChunk, chunk));
nextChunk = Array.Empty<byte>();
offset = 0;
size = bufferSize;
position += i + Delimiter.Length;
stream.Position = position;
continue;
}
else if (stream.Position == stream.Length)
{
// we re at the end of the stream
var chunk = buffer[offset..(bytesRead + offset)]; //return the bytes read
yield return new byte[](Concat(nextChunk, chunk));
break;
}
// the stream is not finished. Copy the last 2 bytes to the beginning of the buffer and set the offset to fill the buffer as of byte 3
nextChunk = Concat(nextChunk, buffer[offset..buffer.Length]);
offset = Delimiter.Length;
size = bufferSize - offset;
Array.Copy(buffer, buffer.Length - offset, buffer, 0, offset);
position += bufferSize - offset;
}
}

相关内容

最新更新