Rx带锁定的并发计划



我又在与Rx作斗争了。这一次我遇到了Observable.Interval.的问题

我的要求是:

  • 我需要每1秒运行一次数据收集
  • 我需要每5秒钟检查一次参数的变化
  • 我在检查更改时无法收集数据
  • 如果数据收集或检查更改所花费的时间超过1秒,请不要将这些勾号排队,只需跳过它们
  • 如果在数据收集过程中出现更改复选框,我希望它等待执行

我尝试过将Observables用于间隔,并发现默认情况下,间隔会将错过的蜱虫排队!最后,在创建了一个完整的控制台应用程序后,我找到了一个示例来演示我需要什么。这个实现似乎只适用于Scheduler.NewThread。我的新问题是,我根本无法测试这个实现,因为测试调度器似乎是CurrentThread。

我的控制台应用程序示例代码:

var otherThreadScheduler = Scheduler.NewThread; 
cancel = otherThreadScheduler.Schedule(
    TimeSpan.FromSeconds(1),
    recursive =>
        {
            lock (obj)
            {
                Console.WriteLine(
                    "Processing Data - Thread ID = " + Thread.CurrentThread.ManagedThreadId);
                var t = new Task(
                    () =>
                        {
                            Console.WriteLine(
                                "Hi I'm the task on thread {0}",
                                Thread.CurrentThread.ManagedThreadId);
                            Thread.Sleep(2000);
                        });
                t.Start();
                Console.WriteLine(
                    "Processing Data Waiting for it to finish - Thread ID = "
                    + Thread.CurrentThread.ManagedThreadId);
                t.Wait();
            }
            Console.WriteLine("Processing Data finished - Thread ID = " + Thread.CurrentThread.ManagedThreadId);
            recursive(TimeSpan.FromSeconds(1));
        });
cancel2 = otherThreadScheduler.Schedule(
    TimeSpan.FromSeconds(1),
    recursive =>
        {
            lock (obj)
            {
                Console.WriteLine("Processing Detection - Thread ID = " + Thread.CurrentThread.ManagedThreadId);
                Thread.Sleep(10000);
            }
            recursive(TimeSpan.FromSeconds(5));
        });

这是不可测试的原因,因为我的真实代码中的任务是可模拟的,所以我模拟它循环,直到我发出停止的信号,但因为我的代码完成了任务。Wait(),我当前的线程阻塞,所以我永远不能用信号通知任务返回。所有这些的目的是模拟长时间运行的数据收集,并验证更改检测不会启动。

所以我的问题是:有没有更优雅的解决方案来满足我的需求?

不使用Task.Wait并阻止当前线程

最好将完成代码作为延续编写。即使用CCD_ 2。

相关内容

  • 没有找到相关文章