我目前正在开发一个应用程序,该应用程序通过方程组合了许多数据流。我希望能够做到的是:
var result = (stream1 + stream2) / stream3 + stream4 * 2;
当任何流更新时,result
都会更新。目前,我在Rx中唯一可以表示的方法是:
var result = stream1.CombineLatest(stream2, (x,y) => x+y)
.CombineLatest(stream3, (x,y) => x / y)
.CombineLatest(stream4, (x,y) => x + y*2);
这不是很清楚。
我现在的想法是:
Public class ArithmeticStream : IObservable<double>
{
public static ArithmeticStream operator +(ArithmeticStream xx, ArithmeticStream yy)
{
return Observable.CombineLatest(xx,yy, (x,y) => x + y);
}
...
}
问题是,combinellatest返回的是一个IObservable<double>
而不是算术流。
两个可能的问题:
如何透明地将IObservable<double>
转换为算术流?
有没有别的途径可以让我得到我想要的结果?
我要添加这个作为一个新的答案,因为它很不同…
所以,如果你决定使用操作符重载的方法,下面是你需要做的(好吧,至少有一种方法)…老实说,我不喜欢这种方法——虽然它确实使查询编写更加简洁,但dsl方法也有类似的"简洁",但在不依赖于操作符重载的意义上要清晰得多。
public static class ArithmeticStreamExt
{
public static ArithmeticStream Wrap(this IObservable<double> src)
{
return new ArithmeticStream(src);
}
public static ArithmeticStream Const(this double constValue)
{
return new ArithmeticStream(Observable.Return(constValue));
}
}
public class ArithmeticStream
{
private IObservable<double> _inner;
public ArithmeticStream(IObservable<double> source)
{
_inner = source;
}
public IObservable<double> Source {get { return _inner; }}
public static ArithmeticStream operator +(
ArithmeticStream left,
ArithmeticStream right)
{
return new ArithmeticStream(
left._inner.CombineLatest(right._inner, (l, r) => l + r));
}
public static ArithmeticStream operator -(
ArithmeticStream left,
ArithmeticStream right)
{
return new ArithmeticStream(
left._inner.CombineLatest(right._inner, (l, r) => l - r));
}
public static ArithmeticStream operator *(
ArithmeticStream left,
ArithmeticStream right)
{
return new ArithmeticStream(
left._inner.CombineLatest(right._inner, (l, r) => l * r));
}
public static ArithmeticStream operator /(
ArithmeticStream left,
ArithmeticStream right)
{
return new ArithmeticStream(
left._inner.CombineLatest(right._inner, (l, r) => l / r));
}
public static ArithmeticStream operator +(
ArithmeticStream left,
IObservable<double> right)
{
return new ArithmeticStream(
left._inner.CombineLatest(right, (l, r) => l + r));
}
public static ArithmeticStream operator -(
ArithmeticStream left,
IObservable<double> right)
{
return new ArithmeticStream(
left._inner.CombineLatest(right, (l, r) => l - r));
}
public static ArithmeticStream operator *(
ArithmeticStream left,
IObservable<double> right)
{
return new ArithmeticStream(
left._inner.CombineLatest(right, (l, r) => l * r));
}
public static ArithmeticStream operator /(
ArithmeticStream left,
IObservable<double> right)
{
return new ArithmeticStream(
left._inner.CombineLatest(right, (l, r) => l / r));
}
}
和一个试验台:
void Main()
{
var s1 = new Subject<double>();
var s2 = new Subject<double>();
var s3 = new Subject<double>();
var s4 = new Subject<double>();
var result = (s1.Wrap() + s2) / s3 + (s4.Wrap() * 2.0.Const());
using(result.Source.Subscribe(Console.WriteLine))
{
s1.OnNext(1.0);
s2.OnNext(2.0);
s3.OnNext(3.0);
s4.OnNext(4.0);
}
}
嗯…我认为你可以用dsl风格相对简单地完成它(不需要摆弄操作符):
public static class Ext
{
public static IObservable<double> Const(this double constant)
{
return Observable.Return(constant);
}
public static IObservable<double> Plus(this IObservable<double> left, IObservable<double> right)
{
return left.CombineLatest(right, (l,r) => l + r);
}
public static IObservable<double> Minus(this IObservable<double> left, IObservable<double> right)
{
return left.CombineLatest(right, (l,r) => l - r);
}
public static IObservable<double> Times(this IObservable<double> left, IObservable<double> right)
{
return left.CombineLatest(right, (l,r) => l * r);
}
public static IObservable<double> Over(this IObservable<double> left, IObservable<double> right)
{
return left.CombineLatest(right,(l,r) => l / r);
}
}
所以你的查询应该是:
var result = (s1.Plus(s2)).Over(s3)
.Plus(s4)
.Times(2.0.Const());
或者,对于一个非常健谈的变体:
var verboseResult =
(s1.Do(Console.WriteLine).Plus(s2.Do(Console.WriteLine)))
.Over(s3.Do(Console.WriteLine))
.Plus(s4.Do(Console.WriteLine))
.Times(2.0.Const())
.Do(x => Console.WriteLine("(s1 + s2) / s3 + s4 * 2 = " + x));
考虑创建一个使用3,4,5等IObservables和一个匹配性的结果选择器函数的combinellatest版本。这将允许您将算术运算表示为普通的双精度浮点运算-非常简单和干净。
如果你需要帮助实现这些,就说出来,我会给你一个例子。
编辑
我指的是已经存在的combinellatest重载,只是没有记录。
这不如操作符重载好,但我很确定你不能做你想要的那种操作符重载,因为在扩展方法中不支持操作符重载。
var stream1 = Observable.Generate(1.0, x => x < 1000.0, x => x + 0.5, x => x);
var stream2 = Observable.Generate(1.0, x => x < 1000.0, x => x + 0.25, x => x);
var stream3 = Observable.Generate(1.0, x => x < 1000.0, x => x + 0.125, x => x);
var stream4 = Observable.Generate(1.0, x => x < 1000.0, x => x + 0.0625, x => x);
var result = stream1.CombineLatest(stream2, stream3, stream4, (w, x, y, z) => (w + x) / y + z * 2);
除此之外,JerKimball给出了一个很好的答案。