我有一种情况,我通过串行/rs232连接到设备;在建立与设备的连接后,它将开始将传感器特定的数据泵送回客户端。
我有两个目标:a) 接收连续的传感器状态更新数据流并移交给UIb) 发送命令并等待回复
var messageObserver = Observable.Defer(() => serialPort.TakeWhile(
s => s != string.Empty))
.Repeat();
从这里开始,我想开始处理传感器特定的数据:目前我正在尝试这个
this.messageObserver.Where(s => Regex.Match(s, @"...").Success)
.Subscribe(o => OnDataArrival(o));
效果很好。。直到我想执行这样的代码
public string GetFirmwareVersion()
{
var firmwareObserver =
this.messageObserver.Where(s => Regex.Match(s, @"...").Success)
.Take(1)
.PublishLast();
var connectable = firmwareObserver.Connect();
// send command for firmware version
port.Send(new byte[] { 0x9 });
// wait for a reply up to 10 seconds
var data = firmwareObserver.Timeout(TimeSpan.FromSeconds(10)).Wait();
connectable.Dispose();
return data;
}
我从未收到任何回复,并且抛出了一个异常(显然是来自TimeOut)。如果我注释掉初始订阅者(它会触发OnDataArrival
),那么GetFirmwareVersion
代码就可以工作了!
因此,我认为我感兴趣的是实现两个目标的推荐途径:a) 在接收数据时处理通过导线传入的数据b) 连接&等待,我们仍在处理中的数据
我认为您看到了源流有多个订阅者的副作用。如果同时运行激发OnDataArrival
和GetFirmwareVersion
代码的订阅者,那么最终将针对serialPort
运行两个并发查询。
在GetFirmwareVersion
代码中发布时,必须从serialPort
发布一个源,并在所有订阅者之间共享。因此,您需要发布messageObserver,并在两个查询中使用它,并在所有订阅者都订阅后连接它。您可以在订阅所有订阅者之前进行连接(如果您想在开始从端口读取后的某个任意点获得固件版本,您的场景可能会要求这样做)-请注意,您可能会错过事件。
存在可供选择的发布运算符来避免这种情况,例如Replay()
。以下是Publish
-Connect
的基本用法:
var messageObserver = Observable.Defer(
() => serialPort.TakeWhile(s => s != string.Empty)).Repeat().Publish();
然后在订阅呼叫后:
messageObserver.Connect();
根据您的场景,有许多方法可以管理发布和连接,但这是最基本的方法。现在,订阅者将共享对串行端口的单个订阅。
对我有用的东西:
private readonly ObservableSerialPort port; // serial port wrapper
private readonly IObservable<string> messageObserver;
private readonly Subject<string> subject;
在我的类中构造函数:
this.port = new ObservableSerialPort_string("COM4");
this.messageObserver = Observable.Defer(() => port.TakeWhile(s => s != string.Empty)).Select(s => new Regex(@"...", RegexOptions.CultureInvariant).Replace(s, string.Empty)).Repeat();
subject = new Subject<string>();
var subSource = messageObserver.Subscribe(subject);
subject.Subscribe(x => Console.WriteLine("raw: " + x));
subject.Where(s => Regex.Match(s, @"...").Success)
.Subscribe(x => Console.WriteLine("gauge: " + x));
现在我可以执行这样的代码:
public string GetFirmwareVersion()
{
var f = subject.Where(s => Regex.Match(s, @"...").Success)
.Take(1)
.PublishLast();
var c = f.Connect();
// send command for firmware version
port.Send(new byte[] { 0x9 });
var data = f.Timeout(TimeSpan.FromSeconds(10)).Wait();
c.Dispose();
return data;
}
public Config GetConfiguration()
{
using (var subject2 = new Subject<Config>())
{
var f = subject.Where(s => Regex.Match(s, @"...").Success)
.Select(s =>
{
// get data via xmodem
Thread.Sleep(500);
var modem = new Modem(this.port._serialPort);
var bytes = modem.XModemReceive(true);
subject2.OnNext(new DeviceConfig(bytes));
subject2.OnCompleted();
return s;
})
.Take(1)
.PublishLast();
var c = f.Connect();
// send command for firmware version
port.Send(new byte[] { 0x2 });
var ret = subject2.Timeout(TimeSpan.FromSeconds(60)).Wait();
c.Dispose();
return ret;
}
}
@James,你可能很在行。作为Rx的新手,我能描述我以前的结果/问题的最好方式是类似于线程阻塞。使用Subject
类带来了巨大的不同。