如何将非一次性对象与冷观测的每次订阅绑定



我可能正在搜索一个自定义的Observable.Using方法,该方法不限于一次性资源。我有一个冷的IObservable,它维护一些内部状态,例如Random实例。此实例不应与IObservable本身绑定,而应与其每个订阅绑定。每个订阅者都应该使用该资源的不同实例。以下面的GetRandomNumbers方法为例:

static IObservable<int> GetRandomNumbers()
{
var random = new Random(0);
return Observable
.Interval(TimeSpan.FromMilliseconds(100))
.Select(x => random.Next(1, 10))
.Take(10);
}

这个方法产生10个随机数。RNG是一个用常量种子初始化的Random实例,因此它应该始终产生相同的10个数字。但遗憾的是,事实并非如此:

var stream = GetRandomNumbers();
Console.WriteLine($"Results A: {String.Join(", ", await stream.ToArray())}");
Console.WriteLine($"Results B: {String.Join(", ", await stream.ToArray())}");

输出:

Results A: 7, 8, 7, 6, 2, 6, 9, 4, 9, 3
Results B: 3, 5, 6, 5, 9, 1, 8, 9, 7, 3

streamobservable的每个订阅者都会得到一组不同的数字!发生的情况是,所有订阅者都使用相同的Random实例。这不仅是不可取的,而且还会造成损坏对象内部状态的风险,因为Random类不是线程安全的。

我试图解决这个问题的方法是使用Using运算符,它有一个Func<TResource> resourceFactory参数:

static IObservable<int> GetRandomNumbers()
{
return Observable.Using(() => new Random(0), random =>
Observable
.Interval(TimeSpan.FromMilliseconds(100))
.Select(x => random.Next(1, 10))
.Take(10)
);
}

如果Random是一次性的,这将是一个完美的解决方案(我用一个一次性类测试了它,并按预期工作(,但它不是,因此代码无法编译:

类型"System.Random"不能用作泛型类型或方法"Observable"中的类型参数"TResource"。使用<TResult、TResource>(Func,Func<TResource,IOobservable>('。没有从"System.Random"到"System.IDisposable"的隐式引用转换。

你能为这个问题提出一个解决方案吗?

Observable.Defer是您的朋友,如果您想要每个订阅者的状态。

试试这个:

static IObservable<int> GetRandomNumbers() =>
Observable
.Defer(() =>
{
var random = new Random(0);
return Observable
.Interval(TimeSpan.FromMilliseconds(100))
.Select(x => random.Next(1, 10))
.Take(10);
});

我的结果:

Results A: 7, 8, 7, 6, 2, 6, 9, 4, 9, 3
Results B: 7, 8, 7, 6, 2, 6, 9, 4, 9, 3

最新更新