我有一个来自RS-232端口的输入流和一个使用Rx流到串行端口的命令队列。我将代码简化如下:
void Init()
{
SerialPort srl;
... // open serial port
IObservable<string> obInput =
Observable.FromEventPattern<
SerialDataReceivedEventHandler,
SerialDataReceivedEventArgs>
(
handler => srl.DataReceived += handler,
handler => srl.DataReceived -= handler
).SelectMany(_ =>
{
List<string> ret;
... //extract messages
return ret;
}).Publish().Refcount();
obCommandOk =
obInput
.Where(msg => msg == "OK" || msg == "KO");
var sAction = new Subject<string>();
var sCommandOk = new Subject<Tuple<string,bool>>();
sAction
.Do(srl.WriteLine)
.Zip(obCommandOk, (cmd, result) =>
{
if (result == "OK")
sCommandOk.OnNext(Tuple.Create(cmd, true))
else
sCommandOk.OnNext(Tuple.Create(cmd, false))
});
}
async bool Command(string cmd)
{
sAction.OnNext(cmd);
return
await sCommandOk
.Where(t => t.Item1 == cmd)
.Select(t => t.Item2)
.FirstAsync();
}
有时发生OnNext之后,结果已经被推到sCommandOk,所以我失去了它。
你能给我建议一个更好的方法来避免失去回应吗?您的Command
方法中有一个竞争条件。您推送Action,然后订阅结果。如果结果是快速的,那么你就失去了它。
这里的一个小改变可以缓解这个问题:
async Task<bool> Command(string cmd)
{
var result = sCommandOk
.Where(t => t.Item1 == cmd)
.Select(t => t.Item2)
.FirstAsync();
sAction.OnNext(cmd);
return await result;
}
我认为你可以通过删除所有sCommandOk
主题来进一步优化
async Task<bool> Command(string cmd)
{
var result = obInput
.Where(t => t.Item1 == cmd)
.Select(t => t.Item2)
.FirstAsync();
sAction.OnNext(cmd);
return await result;
}