试图了解Subject<T>
、ReplaySubject<T>
和其他的工作原理。以下是示例:
(受试者可观察和观察者)
public IObservable<int> CreateObservable()
{
Subject<int> subj = new Subject<int>(); // case 1
ReplaySubject<int> subj = new ReplaySubject<int>(); // case 2
Random rnd = new Random();
int maxValue = rnd.Next(20);
Trace.TraceInformation("Max value is: " + maxValue.ToString());
subj.OnNext(-1); // specific value
for(int iCounter = 0; iCounter < maxValue; iCounter++)
{
Trace.TraceInformation("Value: " + iCounter.ToString() + " is about to publish");
subj.OnNext(iCounter);
}
Trace.TraceInformation("Publish complete");
subj.OnComplete();
return subj;
}
public void Main()
{
//
// First subscription
CreateObservable()
.Subscribe(
onNext: (x)=>{
Trace.TraceInformation("X is: " + x.ToString());
});
//
// Second subscribe
CreateObservable()
.Subscribe(
onNext: (x2)=>{
Trace.TraceInformation("X2 is: " + x.ToString());
});
案例1:奇怪的情况是-当我使用Subject<T>
时,没有订阅(??)-我从来没有看到"X是:"文本-我只看到"值是:"one_answers"最大值是"。。。为什么Subject<T>
不向订阅推送值?
情况2:如果我使用ReplaySubject<T>
-我确实在Subscription中看到了值,但我无法将Defer
选项应用于任何内容。不到Subject
且不到Observable。。。。因此,每个订阅都会收到不同的值,因为CreateObservable
函数是冷可观察的。Defer
在哪里?
每当你需要凭空创建一个可观察的东西时,observable.create应该是第一个想到的。受试者在两种情况下进入画面:
-
您需要某种"可寻址端点"来将数据馈送到,以便所有订阅者都能接收到数据。将其与同时具有调用端(通过委托调用)和订阅端(通过与+-和-=语法组合的委托)的.NET事件进行比较。你会发现,在很多情况下,使用Observable.Create.可以达到同样的效果
-
您需要在查询管道中对消息进行多播,以便在不触发多个订阅的情况下,通过查询逻辑中的许多分支有效地共享可观察的序列。(想想在宿舍订阅一本你最喜欢的杂志,然后在信箱后面放一台复印机。你仍然只需订阅一本,尽管你所有的朋友都可以在信箱上阅读通过OnNext交付的杂志。)
此外,在很多情况下,Rx中已经有一个内置的基元,它可以满足您的需求。例如,有From*工厂方法与现有概念(如事件、任务、异步方法、可枚举序列)进行桥接,其中一些方法在封面下使用主题。对于多播逻辑的第二种情况,有Publish、Replay等运算符家族。
您需要注意代码的执行时间。
在"案例1"中,当您使用Subject<T>
时,您会注意到对OnNext
&CCD_ 12在通过CCD_ 13方法返回可观测值之前结束。由于您使用的是Subject<T>
,这意味着任何后续订阅都将错过所有值,因此您应该期望得到您所得到的东西——什么都没有。
你必须推迟对这个主题的操作,直到你让观察员订阅为止。使用Create
方法执行此操作。方法如下:
public IObservable<int> CreateObservable()
{
return Observable.Create<int>(o =>
{
var subj = new Subject<int>();
var disposable = subj.Subscribe(o);
var rnd = new Random();
var maxValue = rnd.Next(20);
subj.OnNext(-1);
for(int iCounter = 0; iCounter < maxValue; iCounter++)
{
subj.OnNext(iCounter);
}
subj.OnCompleted();
return disposable;
});
}
为了简洁起见,我删除了所有的跟踪代码。
因此,现在,对于每个订阅者,您都可以在Create
方法中获得代码的新执行,并且现在可以从内部Subject<T>
中获得值。
Create
方法的使用通常是创建从方法返回的可观察性的正确方法。
或者,您可以使用ReplaySubject<T>
并避免使用Create
方法。然而,由于许多原因,这是没有吸引力的。它强制在创建时计算整个序列。这会给你一个冷冰冰的观察结果,你本可以在不使用回放主题的情况下更有效地制作。
现在,顺便说一句,你应该尽量避免使用主题。一般的规则是,如果你在使用一个主题,那么你就做错了什么。CreateObservable
方法最好这样写:
public IObservable<int> CreateObservable()
{
return Observable.Create<int>(o =>
{
var rnd = new Random();
var maxValue = rnd.Next(20);
return Observable.Range(-1, maxValue + 1).Subscribe(o);
});
}
根本不需要主题。
如果这有帮助,请告诉我。