我是RX的新手。
我想遍历一个 IEnumerable 并发布到在各自线程中处理数据的多个数据处理程序。
下面是我的示例程序。发布工作并创建一个新线程,但 3 个行处理程序都在 1 个线程中运行。我需要 3 个线程。实现这一点的最佳方法是什么?
class Program
{
public class MyDataGenerator
{
public IEnumerable<int> myData()
{
//Heavy lifting....Don't want to process more than once.
yield return 1;
yield return 2;
yield return 3;
yield return 4;
yield return 5;
yield return 6;
}
}
static void Main(string[] args)
{
MyDataGenerator h = new MyDataGenerator();
Console.WriteLine("Thread id " + Thread.CurrentThread.ManagedThreadId.ToString());
//
var shared = h.myData().ToObservable().Publish();
///////////////////////////////
// Row Handling Requirements
//
// 1. Single Scan of IEnumerable.
// 2. Row handlers process data in their own threads.
// 3. OK if scanning thread blocks while data is processed
//
//Create the RowHandlers
MyRowHandler rn1 = new MyRowHandler();
rn1.ido = shared.Subscribe(i => rn1.processID(i));
MyRowHandler rn2 = new MyRowHandler();
rn2.ido = shared.Subscribe(i => rn2.processID(i));
MyRowHandler rn3 = new MyRowHandler();
rn3.ido = shared.Subscribe(i => rn3.processID(i));
//
shared.Connect();
}
public class MyRowHandler
{
public IDisposable ido = null;
public void processID(int i)
{
var o = Observable.Start(() =>
{
Console.WriteLine(String.Format("Start Thread ID {0} Int{1}", Thread.CurrentThread.ManagedThreadId, i));
Thread.Sleep(30);
Console.WriteLine("Done Thread ID"+Thread.CurrentThread.ManagedThreadId.ToString());
}
);
o.First();
}
}
}
发现:
从Rx获得的编码速度和代码质量收益是以牺牲性能为代价的。 毫无疑问,任务/委托的速度要快几倍。 这意味着关于Rx,需要了解的最重要的事情是何时使用Rx。 以下是摘要指南草案。 对于大卷,我可以看到 Rx 在 chucking、组合和其他许多流多处理程序模型中的使用;但是,基本异步不应使用 rx。
我会发布带有矩阵指南的图像,但该网站不允许我发布图像
如果我正确理解您的排序要求,并且您想要三个并行运行的扫描,您只需在 TaskPool 上观察并从那里订阅;
...
//Create the RowHandlers
MyRowHandler rn1 = new MyRowHandler();
rn1.ido = shared.ObserveOn(Scheduler.TaskPool).Subscribe(i => rn1.processID(i));
...
请注意,由于您随后异步运行并且主线程不会等待扫描完成,因此您的程序将立即终止,除非您在程序末尾放置一个Console.ReadKey()
。
编辑:关于"一路"运行相同的线程,您为此安排的时间有点奇怪。如果将可观察量放在行处理程序中,则可以使用 Scheduler.NewThread 并获得良好的结果;
...
var rowHandler1 = new MyRowHandler();
rowHandler1.ido = shared.ObserveOn(Scheduler.NewThread).Subscribe(rowHandler1.ProcessID);
...
public void ProcessID(int i)
{
Console.WriteLine(String.Format("Start Thread ID {0} Int{1}", Thread.CurrentThread.ManagedThreadId, i));
Thread.Sleep(30);
Console.WriteLine("Done Thread ID" + Thread.CurrentThread.ManagedThreadId.ToString(CultureInfo.InvariantCulture));
}
这将为每个订阅提供自己的线程,并保留它。