请参阅在RX中合并多个自定义可观察器以获取背景信息。
我的场景是,我有许多任意的传感器(硬件)。我已经用C#编写了一些可以连接到这些传感器的可插拔模块。它们目前每个都使用一个线程在计时器上运行一个采集例程。更大的目标是,一旦我了解了如何将轮询更改为RX!
需要分组监控这些传感器,所以我认为会有一个聚合主题,监控器可以订阅特定传感器组(温度、信号强度等)的更新,并可能根据传感器的读数对系统的行为进行更改。
此外,每个传感器都可能连接到日志观察器以记录其当前状态,监视器将连接到日志观察器以记录它的决策
同样的设计模式也适用于我们引入的任何新传感器、监视器或记录器。
下面的示例代码:
using System;
using System.Threading;
using System.Collections.Generic;
namespace Soln
{
class MainClass
{
public static void Main (string[] args)
{
Console.WriteLine ("Hello World!");
var sensorA = new ASensor ();
sensorA.Start ();
var sensorB = new BSensor ();
sensorB.Start ();
var list = new List<ICustomEventHandler<string>> ();
list.Add (sensorA);
list.Add (sensorB);
var strObserver = new StringObserver (list);
strObserver.StartMonitor ();
Console.Read ();
sensorA.Stop ();
sensorB.Stop ();
}
}
//its a modular framework so every module implements
//this interface to interface to a core that loads them up etc
public interface IPlugin
{
bool Start();
void Stop();
}
public interface ICustomEventHandler<T>
{
event MyEventHandler<T> SomethingHappened;
}
//most sensors inherit from a base class and
//most create a thread to work in.
//The base interface also has an event that it uses to transmit
//notifications. The actual eventhandler is genericised so
//can be anything from a primitive to an actual object. Each plugin
//can additionally transmit multiply types but this is a basic example.
//hopefully once i can understand how rx works better , i can change the event handling to an IObservable interface
public abstract class Plugin<T>:IPlugin,ICustomEventHandler<T>
{
Thread oThread;
protected volatile bool _continueWorking = false;
#region IPlugin implementation
public bool Start ()
{
oThread = new Thread (DoWork);
_continueWorking = true;
oThread.Start ();
return true;
}
protected abstract void DoWork();
public void Stop ()
{
_continueWorking = false;
}
protected void RaiseEvent(T eventMessage)
{
if (SomethingHappened != null) {
SomethingHappened (eventMessage);
Console.WriteLine (eventMessage);
}
}
#endregion
public event MyEventHandler<T> SomethingHappened;
}
public class ASensor:Plugin<string>
{
protected override void DoWork()
{
//can't share the code for company reasons
while (_continueWorking) {
Console.WriteLine (" A doing some work");
Thread.Sleep (1000);
RaiseEvent ("ASensor has an event");
}
}
}
public delegate void MyEventHandler<T>(T foo);
public class BSensor:Plugin<string>
{
protected override void DoWork()
{
//can't share the code for company reasons
while (_continueWorking) {
Console.WriteLine ("B doing some work");
Thread.Sleep (1000);
RaiseEvent ("BSensor has an event");
}
}
}
//the observer should be strongly typed and take a list of
//plugins to monitor. At least those are my current thoughts,happy
//to find a better way. There could be multiple observers all monitoring
//the same plugins for different purposes
public abstract class Observer<T>
{
protected List<ICustomEventHandler<T>> Plugins;
protected Observer(List<ICustomEventHandler<T>> plugins)
{
Plugins = plugins;
}
//use rx to subscribe to all events
public abstract void StartMonitor ();
}
public class StringObserver:Observer<string>
{
public StringObserver(List<ICustomEventHandler<string>> plugins)
:base(plugins)
{
}
//subscribe to all plugin events in list using rx merge?
//monitor and log to file
public override void StartMonitor ()
{
//throw new NotImplementedException ();
}
}
}
感谢阅读
这不是一个完全直接的答案,但它可能会让您了解代码的发展方向。
因为你还没有给我们传感器的代码,我真的不能给你一个具体的解决方案,但如果你让我将你当前的代码转换为Rx,那么我可能会做这样的事情:
Func<string, IObservable<string>> generate = t =>
Observable.Interval(TimeSpan.FromSeconds(1.0)).Select(x => t);
var subject = new Subject<IObservable<string>>();
using (var subscription = subject.Merge().Subscribe(Console.WriteLine))
{
subject.OnNext(generate("A doing some work"));
subject.OnNext(generate("B doing some work"));
Console.ReadLine();
}
现在Func<string, IObservable<string>>
只是消除了一点重复,但如果没有它,我可以在5行中复制代码的功能(其中包括Console.ReadLine();
)。
你能给我们看一下传感器代码吗?