流多播 - 读取流一次,但以不同的方式处理它,缓冲最少



为了可扩展性和节省资源,最好避免将整个输入流读入内存,而是尝试将其作为流处理,一次读取小块。 当你想要对数据执行一项操作时,这在 .NET 中很容易完成,例如从 Web 请求中读取数据并将其保存到文件中。 简单的例子:

input.CopyTo(output); // reads chunks of 4096 bytes and writes them to `output`

但是当我想用这些数据做多件事时,那就有点棘手了。 例如,我想:

  • 计算不支持Length属性的流的长度。 我们可以使用自定义DummyStream来执行此操作,该除了跟踪长度外,对写入它的数据不执行任何操作。
  • 计算流的 MD5。
  • 找出数据的 Mime 类型。 FindMimeFromData 只需要流的前 256 个字节。
  • 将整个流保存到数据库。

。但只能通过输入流进行单次传递,并且使用最少的缓冲。

我相信这是可能的。 我可能可以协调多个线程,一个线程从输入流中实际读取,其他线程执行我要执行的每个"处理"任务。 但是,如果做得不好,这很容易变得相当复杂和脆弱。

我的问题是这样的:

  • 我必须使用多个线程吗? 我喜欢某种协程解决方案,其中所有处理都交错在一个线程上。
  • 有没有办法利用 C# 的async功能或反应式扩展来简化此解决方案?

我有剧团把我的大脑包裹在这个周围。 我正在寻找有关实现这一目标的最佳(干净、可维护、有效使用计算机资源)方法的指导,特别是考虑到 TPL、async和 RX 等新技术。


这是我设想的语法的一个例子:

public static void Multicast(this Stream input, params Action<Stream>[] processingActions)
{
// TODO: ??? complicated stream multicasting logic goes here. ???
throw new NotImplementedException();
}

你会像这样使用它:

long length;
byte[] md5;
string mimeType;
int uploadId;
input.Multicast(
s => length = GetLength(s),
s => md5 = CalculateMd5(s),
s => mimeType = DetermineMimeType(s, filename, mimeTypeAsReportedByClient)
s => uploadId = SaveToDatabase(s)
);

下面是其中一个处理操作的示例:

private static byte[] CalculateMd5(Stream input)
{
return MD5.Create().ComputeHash(input);
}

我决定尝试一下Rx实现。这是我到目前为止得到的。它不执行任何数据库写入,但它确实计算长度、MD5 哈希和 mimetype,只需对文件进行一次传递和最少的缓冲。

using System;
using System.Diagnostics;
using System.IO;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Runtime.InteropServices;
using System.Security.Cryptography;
namespace RxTest
{
internal static class Program
{
private static void Main()
{
var expectedValues = ReadExpectedValuesDirectly("demo.txt");
new FileInfo("demo.txt")
.ReadAsObserveable(4096)
.ToFileData()
.Subscribe(observed => Compare(expectedValues, observed));
}
private static void Compare(FileData expected, FileData observed)
{
Console.WriteLine();
WriteLine("expected", expected);
WriteLine("observed", observed);
Console.WriteLine();
Debug.Assert(observed.Length == expected.Length);
Debug.Assert(BitConverter.ToString(observed.Hash) == BitConverter.ToString(expected.Hash));
Debug.Assert(observed.MimeType == expected.MimeType);
}
private static void WriteLine(string prefix, FileData observed)
{
Console.WriteLine("{0}: {1:N0}   {2}   {3}",
prefix,
observed.Length,
observed.MimeType,
BitConverter.ToString(observed.Hash).Replace("-", ""));
}
private static FileData ReadExpectedValuesDirectly(string fileName)
{
return new FileData
{
Length = new FileInfo(fileName).Length,
Hash = MD5.Create().ComputeHash(File.ReadAllBytes(fileName)),
MimeType = FileDataExtensions.FindMimeType(GetFirst256Bytes(fileName))
};
}
private static byte[] GetFirst256Bytes(string path)
{
using (var stream = File.OpenRead(path))
{
var buffer = new byte[256];
if (stream.Length >= 256)
stream.Read(buffer, 0, 256);
else
stream.Read(buffer, 0, (int) stream.Length);
return buffer;
}
}
}
public class FileData
{
public long Length { get; set; }
public string MimeType { get; set; }
public byte[] Hash { get; set; }
}
public static class FileDataExtensions
{
public static IObservable<byte[]> ReadAsObserveable(this FileInfo file, int bufferSize)
{
return Observable.Create<byte[]>(observer =>
{
using (var stream = file.OpenRead())
{
return stream.ReadAsObservable(bufferSize).Subscribe(observer);
}
});
}
public static IObservable<byte[]> ReadAsObservable(this Stream stream, int bufferSize)
{
// TODO: Add scheduling/canceling
return Observable.Create<byte[]>(observer =>
{
var block = new byte[bufferSize];
int bytesRead;
while ((bytesRead = stream.Read(block, 0, bufferSize)) > 0)
{
if (bytesRead == bufferSize)
observer.OnNext(block);
else
{
var lastBlock = new byte[bytesRead];
Array.Copy(block, lastBlock, bytesRead);
observer.OnNext(lastBlock);
observer.OnCompleted();
}
}
return Disposable.Empty;
});
}
public static IObservable<FileData> ToFileData(this IObservable<byte[]> file)
{
return Observable.Create<FileData>(observer =>
{
var counter = 0;
var connectable = file
.Do(_ => Console.WriteLine())
.Do(_ => Console.Write(++counter))
.Publish();
var combineSub = Observable.CombineLatest(
connectable.TotalLength(),
connectable.ComputeHash(MD5.Create()),
connectable.FindMimeType(),
(length, hash, mimeType) => new FileData
{
Hash = hash,
Length = length,
MimeType = mimeType
})
.Subscribe(observer);
var connectSub = connectable.Connect();
return new CompositeDisposable(combineSub, connectSub);
});
}
public static IObservable<long> TotalLength(this IObservable<byte[]> file)
{
return file
.Do(block => Console.Write("tLength()"))
.Select(block => block.LongLength)
.Sum();
}
public static IObservable<byte[]> ComputeHash(this IObservable<byte[]> file, HashAlgorithm algorithm)
{
return Observable.Create<byte[]>(observer =>
file
.Do(block => Console.Write("tComputeHash()"))
.Subscribe(
block => algorithm.TransformBlock(block, 0, block.Length, null, 0),
() =>
{
algorithm.TransformFinalBlock(new byte[0], 0, 0);
observer.OnNext(algorithm.Hash);
observer.OnCompleted();
}));
}
public static IObservable<string> FindMimeType(this IObservable<byte[]> file)
{
// this doesn't handle cases where the file is less than 256 bytes in length.
return file
.Do(block => Console.Write("tFindMimeType()"))
.Take(1)
.Select(block =>
{
var first256 = new byte[256];
Array.Copy(block, first256, 256);
return FindMimeType(first256);
});
}
public static string FindMimeType(byte[] first256)
{
try
{
UInt32 unMimeType;
FindMimeFromData(0, null, first256, 256, null, 0, out unMimeType, 0);
var pMimeType = new IntPtr(unMimeType);
var sMimeTypeFromFile = Marshal.PtrToStringUni(pMimeType);
Marshal.FreeCoTaskMem(pMimeType);
return sMimeTypeFromFile;
}
catch (Exception ex)
{
// not exactly robust exeption handling
Console.WriteLine(ex.ToString());
return null;
}
}
[DllImport(@"urlmon.dll", CharSet = CharSet.Auto)]
private static extern UInt32 FindMimeFromData(
UInt32 pBC,
[MarshalAs(UnmanagedType.LPStr)] String pwzUrl,
[MarshalAs(UnmanagedType.LPArray)] byte[] pBuffer,
UInt32 cbSize,
[MarshalAs(UnmanagedType.LPStr)] String pwzMimeProposed,
UInt32 dwMimeFlags,
out UInt32 ppwzMimeOut,
UInt32 dwReserverd
);
}
}

我认为您要做的是拥有一个输入流,多个输出流,然后将输入复制到所有输出,如下所示:

Stream input;
IList<Stream> outputs;
byte[] buffer = new byte[BufferSize];
int read;
while ((read = input.Read(buffer, 0, buffer.Length)) != 0)
{
foreach (var output in outputs)
{
output.Write(buffer, 0, read);
}
}

这些输出流不一定是普通流,它们可以是特殊流,例如只是计算长度。它们只需要重写Write()方法,因此自定义基Stream可能很有用:

public class OutputStreamBase : Stream
{
private int length;
public override void Flush()
{}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override int Read(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
length += count;
}
public override bool CanRead
{
get { return false; }
}
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return true; }
}
public override long Length
{
get { return length; }
}
public override long Position
{
get { return length; }
set { throw new NotSupportedException(); }
}
}

此流可以直接用作计数流,或轻松实现查找 MIME 流。

使用Rx,您可以使用Observable.Create创建一个读取流的可观察量,然后使用Publish允许多个订阅 Steam 而无需启动它,然后在Published流上调用Connect以启动和运行一切。您可以对流数据采用的每个不同"路由"使用ObserveOnSubscribeOn来确定代码的每个部分运行的时间、地点和方式,这意味着您可以缓冲整个流并将其一次性提交到数据库,对 MD5 执行相同的操作,使用ScanAggregate对流进行计数, 但你也可以有一个"路由"来确定 MIME 类型并提前取消订阅。另外,如果您需要将这些元素重新同步在一起,您可以使用CombineLatest.

这个问题对我来说非常有趣,我希望我现在有时间发布一些真实的代码示例。不幸的是,我没有。希望这能让您了解可以在哪些配置中使用哪些运算符来完成您正在寻找的内容。

以下是一些非流阅读部分的伪代码......

var connectable = ReadStreamAsObservable(stream).Publish();
var mimeType = connectable.ReadMimeTypeAsObservable();
var md5      = connectable.ReadMD5AsObservable();
var record   = connectable.SubmitToDatabaseAsObservable(myDbConnection);
var length   = connectable.Aggregate(0, (acc, x) => acc + x.Bytes.Length);
var parts = Observable.CombineLatest(mimeType, md5, length, record,
(mimeType, md5, length, record) => new {
MimeType = mimeType,
MD5 = md5,
Length = length,
Record = record
});
var subscription = new CompositeDisposable(
parts.Subscribe((x) => Console.WriteLine(x)),
connectable.Connect()
);

相关内容

  • 没有找到相关文章