多个订阅服务器和等待不工作



我有一种情况,我通过串行/rs232连接到设备;在建立与设备的连接后,它将开始将传感器特定的数据泵送回客户端。

我有两个目标:a) 接收连续的传感器状态更新数据流并移交给UIb) 发送命令并等待回复

var messageObserver = Observable.Defer(() => serialPort.TakeWhile(
                                                 s => s != string.Empty))
                                .Repeat();

从这里开始,我想开始处理传感器特定的数据:目前我正在尝试这个

this.messageObserver.Where(s => Regex.Match(s, @"...").Success)
                    .Subscribe(o => OnDataArrival(o));

效果很好。。直到我想执行这样的代码

public string GetFirmwareVersion()
{
    var firmwareObserver =
         this.messageObserver.Where(s => Regex.Match(s, @"...").Success)
                             .Take(1)
                             .PublishLast();
    var connectable = firmwareObserver.Connect();
    // send command for firmware version
    port.Send(new byte[] { 0x9 });
    // wait for a reply up to 10 seconds
    var data = firmwareObserver.Timeout(TimeSpan.FromSeconds(10)).Wait();
    connectable.Dispose();
    return data;
}

我从未收到任何回复,并且抛出了一个异常(显然是来自TimeOut)。如果我注释掉初始订阅者(它会触发OnDataArrival),那么GetFirmwareVersion代码就可以工作了!

因此,我认为我感兴趣的是实现两个目标的推荐途径:a) 在接收数据时处理通过导线传入的数据b) 连接&等待,我们仍在处理中的数据

我认为您看到了源流有多个订阅者的副作用。如果同时运行激发OnDataArrivalGetFirmwareVersion代码的订阅者,那么最终将针对serialPort运行两个并发查询。

GetFirmwareVersion代码中发布时,必须从serialPort发布一个源,并在所有订阅者之间共享。因此,您需要发布messageObserver,并在两个查询中使用它,并在所有订阅者都订阅后连接它。您可以在订阅所有订阅者之前进行连接(如果您想在开始从端口读取后的某个任意点获得固件版本,您的场景可能会要求这样做)-请注意,您可能会错过事件。

存在可供选择的发布运算符来避免这种情况,例如Replay()。以下是Publish-Connect的基本用法:

 var messageObserver = Observable.Defer(
     () => serialPort.TakeWhile(s => s != string.Empty)).Repeat().Publish();

然后在订阅呼叫后:

 messageObserver.Connect();

根据您的场景,有许多方法可以管理发布和连接,但这是最基本的方法。现在,订阅者将共享对串行端口的单个订阅。

对我有用的东西:

private readonly ObservableSerialPort port; // serial port wrapper
private readonly IObservable<string> messageObserver;
private readonly Subject<string> subject;

在我的类中构造函数:

    this.port = new ObservableSerialPort_string("COM4");
    this.messageObserver = Observable.Defer(() => port.TakeWhile(s => s != string.Empty)).Select(s => new Regex(@"...", RegexOptions.CultureInvariant).Replace(s, string.Empty)).Repeat();
    subject = new Subject<string>();
    var subSource = messageObserver.Subscribe(subject);
    subject.Subscribe(x => Console.WriteLine("raw: " + x));
    subject.Where(s => Regex.Match(s, @"...").Success)
           .Subscribe(x => Console.WriteLine("gauge: " + x));

现在我可以执行这样的代码:

    public string GetFirmwareVersion()
    {
        var f = subject.Where(s => Regex.Match(s, @"...").Success)
                       .Take(1)
                       .PublishLast();
        var c = f.Connect();

        // send command for firmware version
        port.Send(new byte[] { 0x9 });
        var data = f.Timeout(TimeSpan.FromSeconds(10)).Wait();
        c.Dispose();
        return data;
    }
    public Config GetConfiguration()
    {
        using (var subject2 = new Subject<Config>())
        {
            var f = subject.Where(s => Regex.Match(s, @"...").Success)
                           .Select(s =>
                           {
                               // get data via xmodem
                               Thread.Sleep(500);
                               var modem = new Modem(this.port._serialPort);
                               var bytes = modem.XModemReceive(true);
                               subject2.OnNext(new DeviceConfig(bytes));
                               subject2.OnCompleted();
                               return s;
                           })
                           .Take(1)
                           .PublishLast();

            var c = f.Connect();
            // send command for firmware version
            port.Send(new byte[] { 0x2 });
            var ret = subject2.Timeout(TimeSpan.FromSeconds(60)).Wait();
            c.Dispose();
            return ret;
        }
    }

@James,你可能很在行。作为Rx的新手,我能描述我以前的结果/问题的最好方式是类似于线程阻塞。使用Subject类带来了巨大的不同。

相关内容

  • 没有找到相关文章

最新更新