我有一段计算资产的代码。有数百万个这样的数据流,所以我想计算流中的所有数据。我当前的"管道"是这样的:
我有一个作为数据阅读器执行的查询。
然后我的Asset类有一个接受IDataReader的构造函数;
Public Asset(IdataReader rdr){
// logic that initiates fields
}
和一个将IDataReader转换为IEnumerable
public static IEnumerable<Asset> ToAssets(IDataReader rdr) {
// make sure the reader is in the right formt
CheckReaderFormat(rdr);
// project reader into IEnumeable<Asset>
while (rdr.Read()) yield return new Asset(rdr);
}
然后传递给一个函数进行实际计算然后将其投影到IEnumerable
然后获得一个包装器,将答案暴露为IDataReader,然后将其传递给OracleBulkCopy并将流写入DB。
到目前为止,它像一个魅力。由于设置,我可以将DataReader交换为从文件读取的IEnumerable,或者将结果写入文件等。这一切都取决于我如何将类/函数串在一起。
现在:有几件事我可以计算,例如,除了正常的答案,我可以有一个DebugAnswer类,也输出一些中间数字调试。所以我想做的是将IEnumerable投射到几个输出流中,这样我就可以在这些流上放置"监听器"。这样我就不用反复查看数据了。我该怎么做呢?有点像有几个事件,然后只触发特定的代码,如果有一个附加的监听器。
有时我也写入DB,但也写入zipfile只是为了保留结果的备份。然后我想在IEnumerable上有两个'listener '。一个项目是作为IDataReader,另一个直接写入文件。
如何输出多个输出流以及如何在一个输出流上放置多个侦听器?是什么让我写出这样的数据流?
编辑
一些我想要做的伪代码:
foreach(Asset in Assets){
if(DebugListener != null){
// compute
DebugAnswer da = new DebugAnswer {result = 100};
yield da to DebugListener; // so instead of yield return yield to that stream
}
if(AnswerListener != null){
// compute basic stuff
Answer a = new Answer { bla = 200 };
yield a to AnswerListener;
}
}
提前感谢,
Gert-Jan
你所描述的听起来有点像响应式框架通过IObservable
接口提供的东西,但我不确定它是否允许多个订阅者到单个订阅流。
如果你看一下IObservable
的文档,它有一个很好的例子,如何做你正在做的事情,多个订阅者到一个对象。
您的示例使用Rx重写:
// The stream of assets
IObservable<Asset> assets = ...
// The stream of each asset projected to a DebugAnswer
IObservable<DebugAnswer> debugAnswers = from asset in assets
select new DebugAnswer { result = 100 };
// Subscribe the DebugListener to receive the debugAnswers
debugAnswers.Subscribe(DebugListener);
// The stream of each asset projected to an Anwer
IObservable<Answer> answers = from asset in assets
select new Answer { bla = 200 };
// Subscribe the AnswerListener to receive the answers
answers.Subscribe(AnswerListener);
这正是响应式扩展的工作(从4.0开始成为。net的一部分,在3.5中作为库提供)。
你不需要多个"监听器",你只需要不具有破坏性甚至不一定可转换的管道组件。
IEnumerable<T> PassThroughEnumerable<T>(IEnumerable<T> source, Action<T> action) {
foreach (T t in source) {
Action(t);
yield return t;
}
}
或者,当您在管道中进行处理时,只引发一些要使用的事件。如果你愿意,你可以同步它们:
static IEnumerable<Asset> ToAssets(IDataReader rdr) {
CheckReaderFormat(rdr);
var h = this.DebugAsset;
while (rdr.Read()) {
var a = new Asset(rdr);
if (h != null) h(a);
yield return a;
}
}
public event EventHandler<Asset> DebugAsset;
如果我没看错的话,应该可以替换或装饰包装器。WrapperDecorator
可能会将呼叫转发给正常的OracleBulkCopy
(或任何您正在使用的)并添加一些自定义调试代码。
这对你有帮助吗?
马提亚