使用流的请求/响应



我有一个来自RS-232端口的输入流和一个使用Rx流到串行端口的命令队列。我将代码简化如下:

void Init()
{
    SerialPort srl;
    ... // open serial port
    IObservable<string> obInput =
        Observable.FromEventPattern<
        SerialDataReceivedEventHandler,
        SerialDataReceivedEventArgs>
        (
            handler => srl.DataReceived += handler,
            handler => srl.DataReceived -= handler
        ).SelectMany(_ =>
        {
        List<string> ret;
        ... //extract messages
        return ret;
        }).Publish().Refcount();
    obCommandOk =
        obInput
        .Where(msg => msg == "OK" || msg == "KO");
    var sAction = new Subject<string>();
    var sCommandOk = new Subject<Tuple<string,bool>>();
    sAction
        .Do(srl.WriteLine)
        .Zip(obCommandOk, (cmd, result) =>
        {
        if (result == "OK")
            sCommandOk.OnNext(Tuple.Create(cmd, true))
        else
            sCommandOk.OnNext(Tuple.Create(cmd, false))
        });
}
async bool Command(string cmd)
{
    sAction.OnNext(cmd);
    return
        await sCommandOk
        .Where(t => t.Item1 == cmd)
        .Select(t => t.Item2)
        .FirstAsync();
}

有时发生OnNext之后,结果已经被推到sCommandOk,所以我失去了它。

你能给我建议一个更好的方法来避免失去回应吗?

您的Command方法中有一个竞争条件。您推送Action,然后订阅结果。如果结果是快速的,那么你就失去了它。

这里的一个小改变可以缓解这个问题:

async Task<bool> Command(string cmd)
{
    var result = sCommandOk
        .Where(t => t.Item1 == cmd)
        .Select(t => t.Item2)
        .FirstAsync();
    sAction.OnNext(cmd);
    return await result;
}

我认为你可以通过删除所有sCommandOk主题来进一步优化

async Task<bool> Command(string cmd)
{
    var result = obInput
        .Where(t => t.Item1 == cmd)
        .Select(t => t.Item2)
        .FirstAsync();
    sAction.OnNext(cmd);
    return await result;
}

相关内容

  • 没有找到相关文章