我有一个可读取的System.IO.Stream
实现,该实现是无法寻求的(并且其Position
始终返回0)。我需要将其发送给在流上执行一些Seek
操作(又称位置)的消费者。这不是一个巨大的寻求者 - 从当前位置说/- 100。是否存在现有的Stream
包装器,可以在流中添加缓冲能力以进行简单搜索操作?
更新:我应该补充说,我的消费者是Naudio mp3filereader。我真的只需要一种播放(缓慢而无限期地)流媒体mp3的方法。我认为这是Naudio希望能够随意寻求其数据源的错误。
寻求向前很容易(只需阅读),但是如果不缓冲而无法向后寻找。也许只是:
using(var ms = new MemoryStream()) {
otherStream.CopyTo(ms);
ms.Position = 0;
// now work with ms
}
但是,这仅适用于已知结束的小型到中度流(而非GB)(不需要这流)。如果您需要较大的流,则 FileStream
到temp-file将起作用,但要大量IO密集型。
以下是一个包装器,可以使任何Stream
可寻求读取操作。
它是通过缓存从底层流读取的,直到构造函数中指定的字节数。当记忆约束禁止Marc Gravell的解决方案时,这将派上用场。
支持的寻求操作:
- 使用
SeekOrigin.Current
和SeekOrigin.Begin
寻求前进,适用于任意偏移 - 使用
SeekOrigin.Current
和SeekOrigin.Begin
向后寻求向后从基础流中的当前位置下降到-seekBackBufferSize
字节(这可能与以前的向后搜索后的readSeekableStream.Position
有所不同) - 寻求使用
SeekOrigin.End
适用于offset >= -seekBackBufferSize && offset <= 0
一般言论
-
Seek
方法和Position
属性是完全处理的,并且不涉及基础流(无论如何只会抛出) - 寻求影响读取仅流的一部分,因此类名称
- 所有写操作都简单地委派给了基础流
- 包装已经可以寻求的流将是浪费资源
- 下面
ReadSeekableStream
解决的一些问题也可以通过我的PeekableStream
类解决
这种实现是新鲜的,尚未战斗。但是,我已经对其进行了相当多的搜索/读取案例和角案件进行了测试,并将其与(可自由寻求的)MemoryStream
进行了交叉比较。
public class ReadSeekableStream : Stream
{
private long _underlyingPosition;
private readonly byte[] _seekBackBuffer;
private int _seekBackBufferCount;
private int _seekBackBufferIndex;
private readonly Stream _underlyingStream;
public ReadSeekableStream(Stream underlyingStream, int seekBackBufferSize)
{
if (!underlyingStream.CanRead)
throw new Exception("Provided stream " + underlyingStream + " is not readable");
_underlyingStream = underlyingStream;
_seekBackBuffer = new byte[seekBackBufferSize];
}
public override bool CanRead { get { return true; } }
public override bool CanSeek { get { return true; } }
public override int Read(byte[] buffer, int offset, int count)
{
int copiedFromBackBufferCount = 0;
if (_seekBackBufferIndex < _seekBackBufferCount)
{
copiedFromBackBufferCount = Math.Min(count, _seekBackBufferCount - _seekBackBufferIndex);
Buffer.BlockCopy(_seekBackBuffer, _seekBackBufferIndex, buffer, offset, copiedFromBackBufferCount);
offset += copiedFromBackBufferCount;
count -= copiedFromBackBufferCount;
_seekBackBufferIndex += copiedFromBackBufferCount;
}
int bytesReadFromUnderlying = 0;
if (count > 0)
{
bytesReadFromUnderlying = _underlyingStream.Read(buffer, offset, count);
if (bytesReadFromUnderlying > 0)
{
_underlyingPosition += bytesReadFromUnderlying;
var copyToBufferCount = Math.Min(bytesReadFromUnderlying, _seekBackBuffer.Length);
var copyToBufferOffset = Math.Min(_seekBackBufferCount, _seekBackBuffer.Length - copyToBufferCount);
var bufferBytesToMove = Math.Min(_seekBackBufferCount - 1, copyToBufferOffset);
if (bufferBytesToMove > 0)
Buffer.BlockCopy(_seekBackBuffer, _seekBackBufferCount - bufferBytesToMove, _seekBackBuffer, 0, bufferBytesToMove);
Buffer.BlockCopy(buffer, offset, _seekBackBuffer, copyToBufferOffset, copyToBufferCount);
_seekBackBufferCount = Math.Min(_seekBackBuffer.Length, _seekBackBufferCount + copyToBufferCount);
_seekBackBufferIndex = _seekBackBufferCount;
}
}
return copiedFromBackBufferCount + bytesReadFromUnderlying;
}
public override long Seek(long offset, SeekOrigin origin)
{
if (origin == SeekOrigin.End)
return SeekFromEnd((int) Math.Max(0, -offset));
var relativeOffset = origin == SeekOrigin.Current
? offset
: offset - Position;
if (relativeOffset == 0)
return Position;
else if (relativeOffset > 0)
return SeekForward(relativeOffset);
else
return SeekBackwards(-relativeOffset);
}
private long SeekForward(long origOffset)
{
long offset = origOffset;
var seekBackBufferLength = _seekBackBuffer.Length;
int backwardSoughtBytes = _seekBackBufferCount - _seekBackBufferIndex;
int seekForwardInBackBuffer = (int) Math.Min(offset, backwardSoughtBytes);
offset -= seekForwardInBackBuffer;
_seekBackBufferIndex += seekForwardInBackBuffer;
if (offset > 0)
{
// first completely fill seekBackBuffer to remove special cases from while loop below
if (_seekBackBufferCount < seekBackBufferLength)
{
var maxRead = seekBackBufferLength - _seekBackBufferCount;
if (offset < maxRead)
maxRead = (int) offset;
var bytesRead = _underlyingStream.Read(_seekBackBuffer, _seekBackBufferCount, maxRead);
_underlyingPosition += bytesRead;
_seekBackBufferCount += bytesRead;
_seekBackBufferIndex = _seekBackBufferCount;
if (bytesRead < maxRead)
{
if (_seekBackBufferCount < offset)
throw new NotSupportedException("Reached end of stream seeking forward " + origOffset + " bytes");
return Position;
}
offset -= bytesRead;
}
// now alternate between filling tempBuffer and seekBackBuffer
bool fillTempBuffer = true;
var tempBuffer = new byte[seekBackBufferLength];
while (offset > 0)
{
var maxRead = offset < seekBackBufferLength ? (int) offset : seekBackBufferLength;
var bytesRead = _underlyingStream.Read(fillTempBuffer ? tempBuffer : _seekBackBuffer, 0, maxRead);
_underlyingPosition += bytesRead;
var bytesReadDiff = maxRead - bytesRead;
offset -= bytesRead;
if (bytesReadDiff > 0 /* reached end-of-stream */ || offset == 0)
{
if (fillTempBuffer)
{
if (bytesRead > 0)
{
Buffer.BlockCopy(_seekBackBuffer, bytesRead, _seekBackBuffer, 0, bytesReadDiff);
Buffer.BlockCopy(tempBuffer, 0, _seekBackBuffer, bytesReadDiff, bytesRead);
}
}
else
{
if (bytesRead > 0)
Buffer.BlockCopy(_seekBackBuffer, 0, _seekBackBuffer, bytesReadDiff, bytesRead);
Buffer.BlockCopy(tempBuffer, bytesRead, _seekBackBuffer, 0, bytesReadDiff);
}
if (offset > 0)
throw new NotSupportedException("Reached end of stream seeking forward " + origOffset + " bytes");
}
fillTempBuffer = !fillTempBuffer;
}
}
return Position;
}
private long SeekBackwards(long offset)
{
var intOffset = (int)offset;
if (offset > int.MaxValue || intOffset > _seekBackBufferIndex)
throw new NotSupportedException("Cannot currently seek backwards more than " + _seekBackBufferIndex + " bytes");
_seekBackBufferIndex -= intOffset;
return Position;
}
private long SeekFromEnd(long offset)
{
var intOffset = (int) offset;
var seekBackBufferLength = _seekBackBuffer.Length;
if (offset > int.MaxValue || intOffset > seekBackBufferLength)
throw new NotSupportedException("Cannot seek backwards from end more than " + seekBackBufferLength + " bytes");
// first completely fill seekBackBuffer to remove special cases from while loop below
if (_seekBackBufferCount < seekBackBufferLength)
{
var maxRead = seekBackBufferLength - _seekBackBufferCount;
var bytesRead = _underlyingStream.Read(_seekBackBuffer, _seekBackBufferCount, maxRead);
_underlyingPosition += bytesRead;
_seekBackBufferCount += bytesRead;
_seekBackBufferIndex = Math.Max(0, _seekBackBufferCount - intOffset);
if (bytesRead < maxRead)
{
if (_seekBackBufferCount < intOffset)
throw new NotSupportedException("Could not seek backwards from end " + intOffset + " bytes");
return Position;
}
}
else
{
_seekBackBufferIndex = _seekBackBufferCount;
}
// now alternate between filling tempBuffer and seekBackBuffer
bool fillTempBuffer = true;
var tempBuffer = new byte[seekBackBufferLength];
while (true)
{
var bytesRead = _underlyingStream.Read(fillTempBuffer ? tempBuffer : _seekBackBuffer, 0, seekBackBufferLength);
_underlyingPosition += bytesRead;
var bytesReadDiff = seekBackBufferLength - bytesRead;
if (bytesReadDiff > 0) // reached end-of-stream
{
if (fillTempBuffer)
{
if (bytesRead > 0)
{
Buffer.BlockCopy(_seekBackBuffer, bytesRead, _seekBackBuffer, 0, bytesReadDiff);
Buffer.BlockCopy(tempBuffer, 0, _seekBackBuffer, bytesReadDiff, bytesRead);
}
}
else
{
if (bytesRead > 0)
Buffer.BlockCopy(_seekBackBuffer, 0, _seekBackBuffer, bytesReadDiff, bytesRead);
Buffer.BlockCopy(tempBuffer, bytesRead, _seekBackBuffer, 0, bytesReadDiff);
}
_seekBackBufferIndex -= intOffset;
return Position;
}
fillTempBuffer = !fillTempBuffer;
}
}
public override long Position
{
get { return _underlyingPosition - (_seekBackBufferCount - _seekBackBufferIndex); }
set { Seek(value, SeekOrigin.Begin); }
}
protected override void Dispose(bool disposing)
{
if (disposing)
_underlyingStream.Close();
base.Dispose(disposing);
}
public override bool CanTimeout { get { return _underlyingStream.CanTimeout; } }
public override bool CanWrite { get { return _underlyingStream.CanWrite; } }
public override long Length { get { return _underlyingStream.Length; } }
public override void SetLength(long value) { _underlyingStream.SetLength(value); }
public override void Write(byte[] buffer, int offset, int count) { _underlyingStream.Write(buffer, offset, count); }
public override void Flush() { _underlyingStream.Flush(); }
}
我只使用亚马逊SDK的makestreamseekable方法:
makestreamseekable方法(输入)
将非可视流转换为system.io.memorystream。记忆中心的位置可以任意移动。
声明语法
c#
public static Stream MakeStreamSeekable(
Stream input
)
参数
*input* ([Stream][2])
要转换的流
返回值
可寻求的内存
备注
MemoryStreams使用字节数组作为其备份商店。请在司法上使用此信息,因为很可能会导致系统资源用完。
另一个解决方案可能是创建包裹另一个流的流类。实施寻找NOP。
class MyStream : Stream
{
public MyStream(Stream baseStream) { this.baseStream = baseStream; }
private Stream baseStream;
// Delegate all operations except Seek/CanSeek to baseStream
public override bool CanSeek { get { return true; } }
public override long Seek(long offset, SeekOrigin origin) { return baseStream.Position; }
}
如果玩家没有充分的理由寻求,这可能会起作用。
Microsoft Biztalk实现了可寻求的流。概念是它是具有输入流和缓冲流的流。输入流是不可看到的流,而Buff流是您想要使用的任何流类型的流类型。因此,每当您提前寻找或阅读它时,将数据复制到输入流的缓冲区。每当您返回并阅读一些您已经阅读的东西时,都会从缓冲区阅读。
很聪明,可以知道输入流是否可寻求并使用缓冲区跳过。
您可以找到许多地方。其中之一在这里,我在下面包括。该代码确实取决于缓冲流的虚拟流,该虚拟流将使用内存,直到缓冲区太大然后使用磁盘为止。您也可以为此获取代码,也可以轻松删除依赖关系。
//---------------------------------------------------------------------
// File: SeekableReadOnlyStream.cs
//
// Summary: A sample pipeline component which demonstrates how to promote message context
// properties and write distinguished fields for XML messages using arbitrary
// XPath expressions.
//
// Sample: Arbitrary XPath Property Handler Pipeline Component SDK
//
//---------------------------------------------------------------------
// This file is part of the Microsoft BizTalk Server 2006 SDK
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// This source code is intended only as a supplement to Microsoft BizTalk
// Server 2006 release and/or on-line documentation. See these other
// materials for detailed information regarding Microsoft code samples.
//
// THIS CODE AND INFORMATION ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY
// KIND, WHETHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR
// PURPOSE.
//---------------------------------------------------------------------
using System;
using System.IO;
using System.Diagnostics;
namespace Microsoft.Samples.BizTalk.Adapter.Tcp
{
/// <summary>
/// Implements a seekable read-only stream which uses buffering if
/// underlying stream is not seekable. Buffer in memory has size
/// threshold and overflows to disk (temporary file) if number of bytes.
/// </summary>
public class SeekableReadOnlyStream : Stream
{
/// <summary>
/// Initializes a SeekableReadOnlyStream instance with base stream and
/// buffering stream.
/// </summary>
/// <param name="baseStream">Base stream</param>
/// <param name="overflowStream">Buffering stream</param>
public SeekableReadOnlyStream(Stream baseStream, Stream bufferingStream)
{
if (null == baseStream)
throw new ArgumentNullException("baseStream");
if (null == bufferingStream)
throw new ArgumentNullException("bufferingStream");
// Sanity check - make sure that buffering stream is seekable
if (!bufferingStream.CanSeek)
throw new NotSupportedException("Buffering stream must be seekable");
this.baseStream = baseStream;
this.bufferingStream = bufferingStream;
}
/// <summary>
/// Initializes a SeekableReadOnlyStream instance with base stream and inherently uses
/// VirtualStream instance as buffering stream.
/// </summary>
/// <param name="baseStream">Base stream</param>
public SeekableReadOnlyStream(Stream baseStream) : this(baseStream, new VirtualStream())
{
// Empty
}
/// <summary>
/// Initializes a SeekableReadOnlyStream instance with base stream and buffer size, and
/// inherently uses VirtualStream instance as buffering stream.
/// </summary>
/// <param name="baseStream">Base stream</param>
/// <param name="bufferSize">Buffer size</param>
public SeekableReadOnlyStream(Stream baseStream, int bufferSize) : this(baseStream, new VirtualStream(bufferSize))
{
// Empty
}
/// <summary>
/// Gets a flag indicating whether this stream can be read.
/// </summary>
public override bool CanRead
{
get { return true; }
}
/// <summary>
/// Gets a flag indicating whether this stream can be written to.
/// </summary>
public override bool CanWrite
{
get { return false; }
}
public override bool CanSeek
{
get { return true; }
}
/// <summary>
/// Gets or sets a stream position.
/// </summary>
public override long Position
{
get
{
// Check if base stream is seekable
if (baseStream.CanSeek)
return baseStream.Position;
return bufferingStream.Position;
}
set
{
// Check if base stream is seekable
if (baseStream.CanSeek)
{
baseStream.Position = value;
return;
}
// Check if current position is the same as being set
if (bufferingStream.Position == value)
return;
// Check if stream position is being set to the value which is in already
// read to the buffering stream space, i.e. less than current buffering stream
// position or less than length of the buffering stream
if (value < bufferingStream.Position || value < bufferingStream.Length)
{
// Just change position in the buffering stream
bufferingStream.Position = value;
}
else
{
//
// Need to read buffer from the base stream from the current position in
// base stream to the position being set and write that buffer to the end
// of the buffering stream
//
// Set position to the last byte in the buffering stream
bufferingStream.Seek(0, SeekOrigin.End);
// Read buffer from the base stream and write it to the buffering stream
// in 4K chunks
byte [] buffer = new byte[ 4096 ];
long bytesToRead = value - bufferingStream.Position;
while (bytesToRead > 0)
{
// Read to buffer 4K or byteToRead, whichever is less
int bytesRead = baseStream.Read(buffer, 0, (int) Math.Min(bytesToRead, buffer.Length));
// Check if any bytes were read
if (0 == bytesRead)
break;
// Write read bytes to the buffering stream
bufferingStream.Write(buffer, 0, bytesRead);
// Decrease bytes to read counter
bytesToRead -= bytesRead;
}
//
// Since this stream is not writable, any attempt to point Position beyond the length
// of the base stream will not succeed, and buffering stream position will be set to the
// last byte in the buffering stream.
//
}
}
}
/// <summary>
/// Seeks in stream. For this stream can be very expensive because entire base stream
/// can be dumped into buffering stream if SeekOrigin.End is used.
/// </summary>
/// <param name="offset">A byte offset relative to the origin parameter</param>
/// <param name="origin">A value of type SeekOrigin indicating the reference point used to obtain the new position</param>
/// <returns>The new position within the current stream</returns>
public override long Seek(long offset, SeekOrigin origin)
{
// Check if base stream is seekable
if (baseStream.CanSeek)
return baseStream.Seek(offset, origin);
if (SeekOrigin.Begin == origin)
{
// Just set the absolute position using Position property
Position = offset;
return Position;
}
if (SeekOrigin.Current == origin)
{
// Set the position using current Position property value plus offset
Position = Position + offset;
return Position;
}
if (SeekOrigin.End == origin)
{
//
// Need to read all remaining not read bytes from the base stream to the
// buffering stream. We can't use offset here because stream size may not
// be available because it's not seekable. Then we'll set the position
// based on buffering stream size.
//
// Set position to the last byte in the buffering stream
bufferingStream.Seek(0, SeekOrigin.End);
// Read all remaining bytes from the base stream to the buffering stream
byte [] buffer = new byte[ 4096 ];
for (;;)
{
// Read buffer from base stream
int bytesRead = baseStream.Read(buffer, 0, buffer.Length);
// Break the reading loop if the base stream is exhausted
if (0 == bytesRead)
break;
// Write buffer to the buffering stream
bufferingStream.Write(buffer, 0, bytesRead);
}
// Now buffering stream size is equal to the base stream size. Set position
// using begin origin
Position = bufferingStream.Length - offset;
return Position;
}
throw new NotSupportedException("Not supported SeekOrigin");
}
/// <summary>
/// Gets the length in bytes of the stream. For this stream can be very expensive
/// because entire base stream will be dumped into buffering stream.
/// </summary>
public override long Length
{
get
{
// Check if base stream is seekable
if (baseStream.CanSeek)
return baseStream.Length;
// Preserve the current stream position
long position = Position;
// Seek to the end of stream
Seek(0, SeekOrigin.End);
// Length will be equal to the current position
long length = Position;
// Restore the current stream position
Position = position;
return length;
}
}
/// <summary>
/// Reads a sequence of bytes from the current stream and advances the position within the stream by the number of bytes read.
/// </summary>
/// <param name="buffer">An array of bytes. When this method returns, the buffer contains the specified byte array with the values between offset and (offset + count- 1) replaced by the bytes read from the current source</param>
/// <param name="offset">The zero-based byte offset in buffer at which to begin storing the data read from the current stream</param>
/// <param name="count">The maximum number of bytes to be read from the current stream</param>
/// <returns>The total number of bytes read into the buffer. This can be less than the number of bytes requested if that many bytes are not currently available, or zero (0) if the end of the stream has been reached</returns>
public override int Read(byte[] buffer, int offset, int count)
{
// Check if base stream is seekable
if (baseStream.CanSeek)
return baseStream.Read(buffer, offset, count);
int bytesReadTotal = 0;
// Check if buffering stream has some bytes to read, starting from the
// current position
if (bufferingStream.Length > bufferingStream.Position)
{
// Read available bytes in buffering stream or count bytes to the buffer, whichever is less
bytesReadTotal = bufferingStream.Read(buffer, offset, (int) Math.Min(bufferingStream.Length - bufferingStream.Position, count));
// Account for bytes read from the buffering stream
count -= bytesReadTotal;
offset += bytesReadTotal;
}
// Check if we have any more bytes to read
if (count > 0)
{
Debug.Assert(bufferingStream.Position == bufferingStream.Length);
//
// At this point, buffering stream has position set to its end. We need to read buffer from
// the base stream and write it to the buffering stream
//
// Read count bytes from the base stream starting from offset
int bytesRead = baseStream.Read(buffer, offset, count);
// Check if bytes were really read
if (bytesRead > 0)
{
// Write number of read bytes to the buffering stream starting from offset in buffer
bufferingStream.Write(buffer, offset, bytesRead);
}
// Add number of bytes read at this step to the number of totally read bytes
bytesReadTotal += bytesRead;
}
return bytesReadTotal;
}
/// <summary>
/// Writes to stream.
/// </summary>
/// <param name="buffer">Buffer to write to stream</param>
/// <param name="offset">Stream offset to start write from</param>
/// <param name="count">Number of bytes from buffer to write</param>
/// <exception cref="NotSupportedException">Is thrown always</exception>
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
/// <summary>
/// Set stream length.
/// </summary>
/// <param name="value">Stream length</param>
/// <exception cref="NotSupportedException">Is thrown always</exception>
public override void SetLength(long value)
{
throw new NotSupportedException();
}
/// <summary>
/// Closes base and buffering streams.
/// </summary>
public override void Close()
{
// Close underlying streams
baseStream.Close();
bufferingStream.Close();
}
/// <summary>
/// Flushes the stream.
/// </summary>
public override void Flush()
{
// Flush the buffering stream
bufferingStream.Flush();
}
private Stream baseStream;
private Stream bufferingStream;
}
}
如果使用System.Net.WebClient
,而不是使用返回Stream
的OpenRead()
使用webClient.DownloadData("https://your.url")
来获取一个字节数组,然后您可以将其变成MemoryStream
。这是一个示例:
byte[] buffer = client.DownloadData(testBlobFile);
using (var stream = new MemoryStream(buffer))
{
... your code using the stream ...
}
显然,在创建Stream
之前,这会下载所有内容,因此它可能会失败使用Stream
。