C# 缓冲的 Zip 流代理



我需要通过http发送非常大的zip文件。文件太大,无法写入文件系统或内存。我想即时压缩并通过 http 输出流发回。我正在使用仅将流作为响应的自定义 Web 服务器,我无法访问 http 响应输出流。(响应。发送(流))。

我的想法是创建一个可以传递到响应中的帮助程序/代理流。发送方法。(请参阅下面的 ZipCreateStream)基本上,这封装了一个内存流,要压缩的文件和存档。响应。发送方法将缓冲复制在调用流中传递的流。读取,ZipCreateStream 实现了一个 Read 方法,该方法将缓冲的块压缩到它的内存流(存档)中,然后将其复制到响应传入的缓冲区。发送。

这很好用,直到我尝试在 ZipCreateStream Read 方法中截断内存流。我试图做的是保持较低的内存使用率,在复制临时流中的字节后,我想"清除"内存流。当我这样做时,zip文件最终损坏了。

任何帮助/想法将不胜感激!我真的被这个难住了。

注意:我正在使用SharpZipLib进行压缩。

public class ZipCreateStream : Stream
{
    readonly string[] entries;
    readonly string[] files;
    ZipOutputStream archive;
    ZipEntry archiveEntry;
    int fileNumber;
    Stream fileStream;
    volatile MemoryStream stream;
    bool hasDisposed;
    int streamOffset;
    readonly bool truncate;
    public ZipCreateStream(string[] files, string[] entries, int compressionLevel = 0, bool truncate = false)
    {
        this.files = files;
        this.entries = entries;
        if (files.Length != entries.Length)
        {
            throw new ArgumentException("Files and entries mismatch");
        }
        this.truncate = truncate;
        stream = new MemoryStream();
        archive = new ZipOutputStream(stream);
        archive.SetLevel(compressionLevel);
    }
    public override bool CanRead
    {
        get
        {
            return true;
        }
    }
    /// <summary>
    ///     When overridden in a derived class, gets a value indicating whether the current stream supports seeking.
    /// </summary>
    /// <returns>
    ///     true if the stream supports seeking; otherwise, false.
    /// </returns>
    public override bool CanSeek
    {
        get
        {
            return false;
        }
    }
    /// <summary>
    ///     When overridden in a derived class, gets a value indicating whether the current stream supports writing.
    /// </summary>
    /// <returns>
    ///     true if the stream supports writing; otherwise, false.
    /// </returns>
    public override bool CanWrite
    {
        get
        {
            return false;
        }
    }
    /// <summary>
    ///     When overridden in a derived class, gets the length in bytes of the stream.
    /// </summary>
    /// <returns>
    ///     A long value representing the length of the stream in bytes.
    /// </returns>
    /// <exception cref="T:System.NotSupportedException">A class derived from Stream does not support seeking. </exception>
    /// <exception cref="T:System.ObjectDisposedException">Methods were called after the stream was closed. </exception>
    public override long Length
    {
        get
        {
            throw new NotSupportedException();
        }
    }
    /// <summary>
    ///     When overridden in a derived class, gets or sets the position within the current stream.
    /// </summary>
    /// <returns>
    ///     The current position within the stream.
    /// </returns>
    /// <exception cref="T:System.IO.IOException">An I/O error occurs. </exception>
    /// <exception cref="T:System.NotSupportedException">The stream does not support seeking. </exception>
    /// <exception cref="T:System.ObjectDisposedException">Methods were called after the stream was closed. </exception>
    public override long Position { get; set; }
    protected override void Dispose(bool disposing)
    {
        archive.Dispose();
        stream.Dispose();
        if (fileStream != null)
        {
            fileStream.Dispose();
        }
        base.Dispose(disposing);
    }
    /// <summary>
    ///     When overridden in a derived class, clears all buffers for this stream and causes any buffered data to be written
    ///     to the underlying device.
    /// </summary>
    /// <exception cref="T:System.IO.IOException">An I/O error occurs. </exception>
    public override void Flush()
    {
        if (stream == null)
        {
            return;
        }
        stream.Flush();
    }
    /// <summary>
    ///     When overridden in a derived class, sets the position within the current stream.
    /// </summary>
    /// <returns>
    ///     The new position within the current stream.
    /// </returns>
    /// <param name="offset">A byte offset relative to the <paramref name="origin" /> parameter. </param>
    /// <param name="origin">
    ///     A value of type <see cref="T:System.IO.SeekOrigin" /> indicating the reference point used to
    ///     obtain the new position.
    /// </param>
    /// <exception cref="T:System.IO.IOException">An I/O error occurs. </exception>
    /// <exception cref="T:System.NotSupportedException">
    ///     The stream does not support seeking, such as if the stream is
    ///     constructed from a pipe or console output.
    /// </exception>
    /// <exception cref="T:System.ObjectDisposedException">Methods were called after the stream was closed. </exception>
    public override long Seek(long offset, SeekOrigin origin)
    {
        return stream.Seek(offset, origin);
    }
    /// <summary>
    ///     When overridden in a derived class, sets the length of the current stream.
    /// </summary>
    /// <param name="value">The desired length of the current stream in bytes. </param>
    /// <exception cref="T:System.IO.IOException">An I/O error occurs. </exception>
    /// <exception cref="T:System.NotSupportedException">
    ///     The stream does not support both writing and seeking, such as if the
    ///     stream is constructed from a pipe or console output.
    /// </exception>
    /// <exception cref="T:System.ObjectDisposedException">Methods were called after the stream was closed. </exception>
    public override void SetLength(long value)
    {
        stream.SetLength(value);
    }
    /// <summary>
    ///     When overridden in a derived class, reads a sequence of bytes from the current stream and advances the position
    ///     within the stream by the number of bytes read.
    /// </summary>
    /// <returns>
    ///     The total number of bytes read into the buffer. This can be less than the number of bytes requested if that many
    ///     bytes are not currently available, or zero (0) if the end of the stream has been reached.
    /// </returns>
    /// <param name="buffer">
    ///     An array of bytes. When this method returns, the buffer contains the specified byte array with the
    ///     values between <paramref name="offset" /> and (<paramref name="offset" /> + <paramref name="count" /> - 1) replaced
    ///     by the bytes read from the current source.
    /// </param>
    /// <param name="offset">
    ///     The zero-based byte offset in <paramref name="buffer" /> at which to begin storing the data read
    ///     from the current stream.
    /// </param>
    /// <param name="count">The maximum number of bytes to be read from the current stream. </param>
    /// <exception cref="T:System.ArgumentException">
    ///     The sum of <paramref name="offset" /> and <paramref name="count" /> is
    ///     larger than the buffer length.
    /// </exception>
    /// <exception cref="T:System.ArgumentNullException"><paramref name="buffer" /> is null. </exception>
    /// <exception cref="T:System.ArgumentOutOfRangeException">
    ///     <paramref name="offset" /> or <paramref name="count" /> is
    ///     negative.
    /// </exception>
    /// <exception cref="T:System.IO.IOException">An I/O error occurs. </exception>
    /// <exception cref="T:System.NotSupportedException">The stream does not support reading. </exception>
    /// <exception cref="T:System.ObjectDisposedException">Methods were called after the stream was closed. </exception>
    public override int Read(byte[] buffer, int offset, int count)
    {
        // Get the next set of buffered data from internal buffer
        if (offset != 0)
        {
            throw new ArgumentException("Not seekable");
        }
        if (streamOffset == stream.Length)
        {
            // when all buffered data is copied, clear the memory stream
            if (truncate)
            {
                streamOffset = 0;
                stream.SetLength(0);
            }
            if (fileStream != null && fileStream.Position == fileStream.Length)
            {
                fileNumber++;
                fileStream.Dispose();
                fileStream = null;
            }
            if (fileNumber < files.Length)
            {
                if (fileStream == null)
                {
                    string file = files[fileNumber];
                    string entry = entries[fileNumber];
                    fileStream = File.Open(file, FileMode.Open, FileAccess.Read, FileShare.Read);
                    archiveEntry = new ZipEntry(ZipEntry.CleanName(entry));
                    archive.PutNextEntry(archiveEntry);
                }
                byte[] writebuffer = new byte[buffer.Length];
                while (stream.Length - streamOffset < buffer.Length)
                {
                    int bytesRead = fileStream.Read(writebuffer, 0, writebuffer.Length);
                    if (bytesRead > 0)
                    {
                        archive.Write(writebuffer, 0, bytesRead);
                    }
                    else
                    {
                        archive.Flush();
                        break;
                    }
                }
            }
        }
        if (streamOffset == stream.Length && !hasDisposed)
        {
            hasDisposed = true;
            archive.Finish();
        }
        stream.Seek(streamOffset, SeekOrigin.Begin);
        int readCount = stream.Read(buffer, 0, count);
        streamOffset += readCount;
        return readCount;
    }
    /// <summary>
    ///     When overridden in a derived class, writes a sequence of bytes to the current stream and advances the current
    ///     position within this stream by the number of bytes written.
    /// </summary>
    /// <param name="buffer">
    ///     An array of bytes. This method copies <paramref name="count" /> bytes from
    ///     <paramref name="buffer" /> to the current stream.
    /// </param>
    /// <param name="offset">
    ///     The zero-based byte offset in <paramref name="buffer" /> at which to begin copying bytes to the
    ///     current stream.
    /// </param>
    /// <param name="count">The number of bytes to be written to the current stream. </param>
    public override void Write(byte[] buffer, int offset, int count)
    {
        throw new NotSupportedException();
    }
}

问题出在位置属性中。您可以在下面找到此任务的 2 个工作类。它们使用标准的 .NET ZipArchive 类:

public class ThroughZipStream : Stream
{
    private readonly ThroughMemoryStream _ms = new ThroughMemoryStream();
    private readonly ZipArchive _zip;
    private readonly Stream _entryStream;
    private readonly Stream _input;
    private byte[] _buffer = new byte[10000]; // buffer to read from input stream
    public ThroughZipStream(Stream input, string entryName)
    {
        _input = input;
        _zip = new ZipArchive(_ms, ZipArchiveMode.Create, true);
        var entry = _zip.CreateEntry(entryName);
        _entryStream = entry.Open();
    }
    public override int Read(byte[] buffer, int offset, int count)
    {
        while (_ms.Length < count && _buffer != null)
        {
            var len = _input.Read(_buffer, 0, _buffer.Length);
            if (len == 0)
            {
                _entryStream.Dispose();
                _zip.Dispose();
                _buffer = null;
                break;
            }
            _entryStream.Write(_buffer, 0, len);
        }
        return _ms.Read(buffer, offset, count);
    }
    public override bool CanRead { get { return true; } }
    public override bool CanSeek { get { return false; } } // we can't seek
    public override bool CanWrite { get { return false; } }
    public override void Flush() { }
    public override long Position { get { throw new NotImplementedException(); } set { throw new NotImplementedException(); } }
    public override long Length { get { throw new NotImplementedException(); } }
    public override long Seek(long offset, SeekOrigin origin) { throw new NotImplementedException(); }
    public override void SetLength(long value) { throw new NotImplementedException(); }
    public override void Write(byte[] buffer, int offset, int count) { throw new NotImplementedException(); }
}
public class ThroughMemoryStream : MemoryStream
{
    private long _position;
    public override bool CanSeek { get { return false; } }
    public override void Write(byte[] buffer, int offset, int count)
    {
        base.Write(buffer, offset, count);
        _position += count;
    }
    public override int Read(byte[] buffer, int offset, int count)
    {
        var msBuffer = GetBuffer();
        var realCount = Math.Min(Length, count);
        Array.Copy(msBuffer, 0, buffer, 0, realCount);
        Array.Copy(msBuffer, realCount, msBuffer, 0, Length - realCount);
        SetLength(Length - realCount);
        return (int)realCount;
    }
    public override long Position
    {
        get { return _position; } 
        set { throw new NotImplementedException(); }
    }
}

用法:

var compressedStream = new ThroughZipStream(fileStream, "somefile.txt")

PS:我遇到了同样的问题,无法谷歌任何解决方案。这对我来说是一个很大的惊喜,我认为这种方式是管道流的最佳方式。

相关内容

  • 没有找到相关文章

最新更新