如何按动态数量的计数器集拆分可观察量,并在不同的线程上运行每个可观察量?



我有未知数量的CounterSet(可能是 2、3、10 等),想按CounterSet数拆分我的observable,并希望在不同的线程上运行每个observable。如何实现这一点?

下面的代码修复了observable数,并且都在同一线程中运行,

var observable = PerfCounterObservable.FromFile(@"C:FilesBasicPerfCounters.blg");
var CounterSet1_observable = from a in observable
group a by new { a.Machine, a.Instance, a.Timestamp } into groups
from g in groups.Where(a => a.CounterSet == "CounterSet1")
select new
{
groups.Key.Machine,
groups.Key.Instance,
groups.Key.Timestamp,
Counters = g
};
CounterSet1_observable.Subscribe(z => Console.WriteLine("{0}: {1}", z.Counters.CounterSet + " : " + z.Counters.CounterName, Thread.CurrentThread.ManagedThreadId));
var CounterSet2_observable = from a in observable
group a by new { a.Machine, a.Instance, a.Timestamp } into groups
from g in groups.Where(a => a.CounterSet == "CounterSet2")
select new
{
groups.Key.Machine,
groups.Key.Instance,
groups.Key.Timestamp,
Counters = g
};
CounterSet2_observable.Subscribe(z => Console.WriteLine("{0}: {1}", z.Counters.CounterSet + " : " + z.Counters.CounterName, Thread.CurrentThread.ManagedThreadId));

Sentinel 给出的答案可以在没有内部订阅的情况下完成:

Observable
.Interval(TimeSpan.FromSeconds(1))
.GroupBy(num => num % 2 == 0)
.SubscribeOn(TaskPoolScheduler.Default)
.SelectMany(group => group)
.Do(num => Console.WriteLine($"Work done on thread: {CurrentThread.ManagedThreadId} Num: {num}"))
.ObserveOn(Scheduler.Default)
.Subscribe(num => Console.WriteLine($"Displayed on thread: {CurrentThread.ManagedThreadId} Num: {num}"));

此外,ObserveOn用于选择要订阅的线程,因此,如果您在控制台应用程序上显示结果,那很好,但是在 WPF 之类的东西中尝试一下,你会得到一个令人讨厌的惊喜。

GroupBy linq 就像 Rx "Filter"。这将为您提供"分组可观察"流。订阅该流时,在每个新的分组可观察对象上,使用 ObserveOn 订阅该分组可观察对象,这会将观察者置于自己的线程上。下面的代码说明了这个场景,我认为可以很容易地适应你上面的情况:

void TestGroupBy()
{
var source1 = Observable.Interval(TimeSpan.FromSeconds(1));
var m = source1.
GroupBy(x => { return x % 2 == 0; });
m.Subscribe(
v => {
Console.WriteLine("Subscribing to " + v.Key);
v.ObserveOn(Scheduler.Default).
Subscribe(seq =>
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId+"   "+seq);
});
}
);
}

相关内容

  • 没有找到相关文章

最新更新