使用RX构建传感器监控系统



请参阅在RX中合并多个自定义可观察器以获取背景信息。

我的场景是,我有许多任意的传感器(硬件)。我已经用C#编写了一些可以连接到这些传感器的可插拔模块。它们目前每个都使用一个线程在计时器上运行一个采集例程。更大的目标是,一旦我了解了如何将轮询更改为RX!

需要分组监控这些传感器,所以我认为会有一个聚合主题,监控器可以订阅特定传感器组(温度、信号强度等)的更新,并可能根据传感器的读数对系统的行为进行更改。

此外,每个传感器都可能连接到日志观察器以记录其当前状态,监视器将连接到日志观察器以记录它的决策

同样的设计模式也适用于我们引入的任何新传感器、监视器或记录器。

下面的示例代码:

using System;
using System.Threading;
using System.Collections.Generic;
namespace Soln
    {
    class MainClass
    {
        public static void Main (string[] args)
        {
            Console.WriteLine ("Hello World!");
            var sensorA = new ASensor ();
            sensorA.Start ();
            var sensorB = new BSensor ();
            sensorB.Start ();
            var list = new List<ICustomEventHandler<string>> ();
            list.Add (sensorA);
            list.Add (sensorB);
            var strObserver = new StringObserver (list);
            strObserver.StartMonitor ();
            Console.Read ();
            sensorA.Stop ();
            sensorB.Stop ();
        }
    }
    //its a modular framework so every module implements
    //this interface to interface to a core that loads them up etc
    public interface IPlugin
    {
        bool Start();
        void Stop();
    }
    public interface ICustomEventHandler<T>
    {
        event MyEventHandler<T> SomethingHappened;
    }
    //most sensors inherit from a base class and
    //most create a thread to work in. 
    //The base interface also has an event that it uses to transmit
    //notifications. The actual eventhandler is genericised so
    //can be anything from a primitive to an actual object. Each plugin
    //can additionally transmit multiply types but this is a basic example.
    //hopefully once i can understand how rx works better , i can change the event handling to an IObservable interface
    public abstract class Plugin<T>:IPlugin,ICustomEventHandler<T>
    {   
        Thread oThread;
        protected volatile bool _continueWorking = false;
        #region IPlugin implementation
        public bool Start ()
        {
            oThread = new Thread (DoWork);
            _continueWorking = true;
            oThread.Start ();
            return true;
        }
        protected abstract void DoWork();
        public void Stop ()
        {
            _continueWorking = false;
        }
        protected void RaiseEvent(T eventMessage)
        {
            if (SomethingHappened != null) {
                SomethingHappened (eventMessage);
                Console.WriteLine (eventMessage);
            }
        }
        #endregion
        public event MyEventHandler<T> SomethingHappened;
    }
    public class ASensor:Plugin<string>
    {
        protected override void DoWork()
        {
            //can't share the code for company reasons
            while (_continueWorking) {
                Console.WriteLine (" A doing some work");
                Thread.Sleep (1000);
                RaiseEvent ("ASensor has an event");
            }
        }
    }
    public delegate void MyEventHandler<T>(T foo);
    public class BSensor:Plugin<string>
    {
        protected override void DoWork()
        {
            //can't share the code for company reasons
            while (_continueWorking) {
                Console.WriteLine ("B doing some work");
                Thread.Sleep (1000);
                RaiseEvent ("BSensor has an event");
            }
        }
    }
    //the observer should be strongly typed and take a list of 
    //plugins to monitor. At least those are my current thoughts,happy
    //to find a better way. There could be multiple observers all monitoring
    //the same plugins for different purposes
    public abstract class Observer<T>
    {
        protected List<ICustomEventHandler<T>> Plugins;
        protected Observer(List<ICustomEventHandler<T>> plugins)
        {
            Plugins = plugins;
        }
        //use rx to subscribe to all events 
        public abstract void StartMonitor ();
    }
    public class StringObserver:Observer<string>
    {
        public StringObserver(List<ICustomEventHandler<string>> plugins)
            :base(plugins)
        {
        }
        //subscribe to all plugin events in list using rx merge?
        //monitor and log to file
        public override void StartMonitor ()
        {
            //throw new NotImplementedException ();
        }
    }
}

感谢阅读

这不是一个完全直接的答案,但它可能会让您了解代码的发展方向。

因为你还没有给我们传感器的代码,我真的不能给你一个具体的解决方案,但如果你让我将你当前的代码转换为Rx,那么我可能会做这样的事情:

Func<string, IObservable<string>> generate = t =>
    Observable.Interval(TimeSpan.FromSeconds(1.0)).Select(x => t);
var subject = new Subject<IObservable<string>>();
using (var subscription = subject.Merge().Subscribe(Console.WriteLine))
{
    subject.OnNext(generate("A doing some work"));
    subject.OnNext(generate("B doing some work"));
    Console.ReadLine();
}

现在Func<string, IObservable<string>>只是消除了一点重复,但如果没有它,我可以在5行中复制代码的功能(其中包括Console.ReadLine();)。

你能给我们看一下传感器代码吗?

相关内容

  • 没有找到相关文章

最新更新