我有一个通过串行端口与计算机通信的设备。发送"START"命令后,设备会响应确认并开始监视某些外部活动。然后,它根据该外部活动在串行端口上异步传输一些消息。当设备收到"STOP"命令时,它会以确认作为响应,然后停止发送更多消息(表示外部活动(。
我已经使用冷可观察量实现了启动/停止命令,这些命令执行副作用(在串行端口上发送命令(,并在串行端口上收到 ackowledge 时发出单个Unit.Default
。我想构造一个发出与外部活动对应的消息的IObservable
,并在订阅时执行"START"副作用,在释放订阅时执行"STOP"副作用。"开始"很容易,我只需要做一个"SelectMany",但我不知道如何执行"停止"。
class MonitoringDevice
{
private SerialPort _sp;
private IObservable<byte> _receivedBytes;
public IObservable<ExternalActivity> ActivityStream { get; }
public MonitoringDevice()
{
_sp = new SerialPort("COM1");
_receivedBytes = Observable
.FromEventPattern<SerialDataReceivedEventHandler, SerialDataReceivedEventArgs>(
h =>
{
_sp.DiscardInBuffer();
_sp.DataReceived += h;
},
h =>
{
_sp.DataReceived -= h;
})
.SelectMany(x =>
{
byte[] buffer = new byte[1024];
var ret = new List<byte>();
int bytesRead = 0;
do
{
bytesRead = _sp.Read(buffer, 0, buffer.Length);
ret.AddRange(buffer.Take(bytesRead));
} while ((bytesRead >= buffer.Length));
return ret;
})
.Publish()
.RefCount();
ActivityStream = StartMonitoringAsync()
.SelectMany( _receivedBytes.ToActivity());
// we need to execute StopMonitoringAsync
// when a subscription to ActivityStream is disposed
_sp.Open();
}
private IObservable<Unit> StartMonitoringAsync()
{
return Observable
.Create<Unit>(
obs =>
{
_sp.Write("START");
return _receivedBytes
.ToAcknowledge()
.FirstAsync()
.Timeout(TimeSpan.FromMilliseconds(1000))
.Subscribe(obs);
});
}
private IObservable<Unit> StopMonitoringAsync()
{
return Observable
.Create<Unit>(
obs =>
{
_sp.Write("STOP");
return _receivedBytes
.ToAcknowledge()
.FirstAsync()
.Timeout(TimeSpan.FromMilliseconds(1000))
.Subscribe(obs);
});
}
}
ExternalActivity
只是一个POCO。
ToAcknowledge
是一种扩展方法,返回在设备传输确认时发出Unit.Default
的IObservable
。 - 这正在按预期工作;
ToActivity
是一种扩展方法,返回一个IObservable
,用于分析传入的串行数据并发出ExternalActivity
对象。 - 这正在按预期工作;
编辑:添加了ToAcknowledge
和ToActivity
扩展方法的实现。
public static IObservable<Unit> ToAcknowledge(this IObservable<byte> source)
{
return source.Buffer(3, 1)
.Where(bfr => bfr.SequenceEqual(new byte[] { 65, 67, 75 })) // ACK
.Select(x => Unit.Default);
}
public static IObservable<ExternalActivity> ToActivity(this IObservable<byte> source)
{
return source
.Publish(ps => ps.Buffer(ps.Where(x => x == 1), // SOH
bo => ps.Where(x => x == 4))) // EOT
.Select(bfr => bfr.Take(bfr.Count - 1).Skip(1))
.Where(bfr => bfr.Count() == 12)
.Select(bfr =>
{
var timestamp = BitConverter.ToInt64(bfr.Take(8).ToArray(), 0);
var id = BitConverter.ToInt32(bfr.ToArray(), 8);
return new ExternalActivity(timestamp, id);
});
}
您可以将StartAsync
修改为:
private IObservable<Unit> StartAsync(Action unsubscribe)
{
return
Observable
.Create<Unit>(o =>
{
var subscription =
Observable
.Timer(TimeSpan.FromSeconds(1))
.Select(_=> Unit.Default)
.Subscribe(o);
return new CompositeDisposable(
subscription,
Disposable.Create(unsubscribe));
});;
}
然后,您可以注入任何您喜欢的Action unsubscribe
。
尝试使用以下代码进行测试:
var subscription =
StartAsync(() => Console.WriteLine("Done"))
.Subscribe();
Thread.Sleep(3000);
subscription.Dispose();
您将在 3 秒后看到"完成"写入主机。