C# RX (System.Reactive) - 异步 - 将 IEnumerable 发布<DataRow>到多个观测数据处理程序



我是RX的新手。

我想遍历一个 IEnumerable 并发布到在各自线程中处理数据的多个数据处理程序。

下面是我的示例程序。发布工作并创建一个新线程,但 3 个行处理程序都在 1 个线程中运行。我需要 3 个线程。实现这一点的最佳方法是什么?

class Program
{
    public class MyDataGenerator
    {
        public IEnumerable<int> myData()
        {
            //Heavy lifting....Don't want to process more than once.
            yield return 1;
            yield return 2;
            yield return 3;
            yield return 4;
            yield return 5;
            yield return 6;
        }
    }
    static void Main(string[] args)
    {
        MyDataGenerator h = new MyDataGenerator();
        Console.WriteLine("Thread id " + Thread.CurrentThread.ManagedThreadId.ToString());
        //
        var shared = h.myData().ToObservable().Publish();
        ///////////////////////////////
        //  Row Handling Requirements
        //
        //      1. Single Scan of IEnumerable. 
        //      2. Row handlers process data in their own threads. 
        //      3. OK if scanning thread blocks while data is processed
        //
        //Create the RowHandlers
        MyRowHandler rn1 = new MyRowHandler();
        rn1.ido = shared.Subscribe(i => rn1.processID(i));
        MyRowHandler rn2 = new MyRowHandler();
        rn2.ido = shared.Subscribe(i => rn2.processID(i));
        MyRowHandler rn3 = new MyRowHandler();
        rn3.ido = shared.Subscribe(i => rn3.processID(i));
        //
        shared.Connect();
    }
    public class MyRowHandler
    {
        public IDisposable ido = null;
        public void processID(int i)
        {
            var o = Observable.Start(() =>
                                        {
                                            Console.WriteLine(String.Format("Start Thread ID {0} Int{1}", Thread.CurrentThread.ManagedThreadId, i));
                                            Thread.Sleep(30);
                                            Console.WriteLine("Done Thread ID"+Thread.CurrentThread.ManagedThreadId.ToString());
                                        }
                                    );
            o.First(); 
        }
    }
}

发现:

从Rx获得的编码速度和代码质量收益是以牺牲性能为代价的。 毫无疑问,任务/委托的速度要快几倍。 这意味着关于Rx,需要了解的最重要的事情是何时使用Rx。 以下是摘要指南草案。 对于大卷,我可以看到 Rx 在 chucking、组合和其他许多流多处理程序模型中的使用;但是,基本异步不应使用 rx。

我会发布带有矩阵指南的图像,但该网站不允许我发布图像

如果我正确理解您的排序要求,并且您想要三个并行运行的扫描,您只需在 TaskPool 上观察并从那里订阅;

...
//Create the RowHandlers
MyRowHandler rn1 = new MyRowHandler();
rn1.ido = shared.ObserveOn(Scheduler.TaskPool).Subscribe(i => rn1.processID(i));
...

请注意,由于您随后异步运行并且主线程不会等待扫描完成,因此您的程序将立即终止,除非您在程序末尾放置一个Console.ReadKey()

编辑:关于"一路"运行相同的线程,您为此安排的时间有点奇怪。如果将可观察量放在行处理程序中,则可以使用 Scheduler.NewThread 并获得良好的结果;

...
var rowHandler1 = new MyRowHandler();
rowHandler1.ido = shared.ObserveOn(Scheduler.NewThread).Subscribe(rowHandler1.ProcessID);
...
public void ProcessID(int i)
{
    Console.WriteLine(String.Format("Start Thread ID {0} Int{1}", Thread.CurrentThread.ManagedThreadId, i));
    Thread.Sleep(30);
    Console.WriteLine("Done Thread ID" + Thread.CurrentThread.ManagedThreadId.ToString(CultureInfo.InvariantCulture));
}

这将为每个订阅提供自己的线程,并保留它。

相关内容

最新更新