我有一个GeoLocationProvider
(它实现了IObservable<System.Device.Location.GeoCoordinate>
,每x毫秒输出当前位置。
现在我想使用RX读取所有这些GPS坐标,并且只有在位置变化很重要的情况下才通知订阅者(例如-行进距离> 10米)。
My "Main" code
// [...]
IObservable<GeoCoordinate> locationProvider = new GeoLocationProvider();
LocationFeed locationFeed = new LocationFeed(locationProvider);
// register any interested observers on the locationFeed.
ConsoleLocationReporter c1 = new ConsoleLocationReporter("reporter0001");
locationFeed.Subscribe(c1);
我的LocationFeed实现如下:
using System;
using System.Device.Location;
using System.Reactive.Subjects;
namespace My.Namespace.Movement
{
public class LocationFeed : ISubject<GeoCoordinate>, IDisposable
{
private readonly IDisposable _subscription;
private readonly Subject<GeoCoordinate> _subject;
public LocationFeed(IObservable<GeoCoordinate> observableSource)
{
_subject = new Subject<GeoCoordinate>();
_subscription = observableSource.Subscribe(_subject); // TODO: Add logic to filter to only significant movement changes (> 10m)
}
public void Dispose()
{
_subscription?.Dispose();
_subject?.Dispose();
}
public void OnNext(GeoCoordinate value)
{
_subject.OnNext(value);
}
public void OnError(Exception error)
{
_subject.OnError(error);
}
public void OnCompleted()
{
_subject.OnCompleted();
}
public IDisposable Subscribe(IObserver<GeoCoordinate> observer)
{
return _subject.Subscribe(observer);
}
}
}
问题1: GeoCoordinate提供了计算两个坐标之间距离的方法c1.DistanceTo(c2)
。我只想报告(发布)新的GeoCoordinates,如果阈值与最后一个推大于x。我如何实现这一点?
问题2: subject的用法是否正确?我实现issubject的方式是否正确?我不想在我的"主"代码中添加所有的连接,并将它们全部移动到一个单独的类中。
我强烈建议不要实现ISubject<T>
(或者IObservable<T>
或IObserver<T>
)。相反,尝试组合现有的工厂和类型,然后将它们公开为"has a"关系,而不是"is a"关系。
可以看到,LocationFeed
纯粹是对observableSource
参数的包装,因此似乎无法解决任何问题。我建议删掉它。
关于你发布的问题,一个解决方案是使用大小为2的缓冲区和步长为1。
IObservable<GeoCoordinate> locationProvider = new GeoLocationProvider();
locationProvider
.Buffer(2,1)
.Where(buffer=>buffer[0].DistanceTo(buffer[1]) > 10)
.Select(buffer=>buffer[1])
.Subscribe(
pos => Console.WriteLine(pos),
ex => { },
() => {});
或者可以使用Scan
IObservable<GeoCoordinate> locationProvider = new GeoLocationProvider();
locationProvider
.Scan(Tuple.Create(GeoCoordinate.Zero,GeoCoordinate.Zero), (acc, cur)=>Tuple.Create(acc.Item2, cur))
.Where(pair=>pair.Item1.DistanceTo(pair.Item2) > 10)
.Select(pair=>pair.Item2)
.Subscribe(
pos => Console.WriteLine(pos),
ex => { },
() => {});
我不确定你对第一个价值的要求是什么。该不该发表?
编辑:下面是一个经过测试的解决方案(使用Point
类型),它将在发生"重大"更改时推送Unit
。如果这不是您想要的,那么您应该可以修改它以获得您真正想要的
void Main()
{
var zero = new System.Drawing.Point(0,0);
var fenceDistance = 10;
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
ReactiveTest.OnNext(1, new System.Drawing.Point(0,0)),
ReactiveTest.OnNext(2, new System.Drawing.Point(0,9)), //Not far enough
ReactiveTest.OnNext(3, new System.Drawing.Point(0,10)), //Touches the fence
ReactiveTest.OnNext(4, new System.Drawing.Point(0,15)), //Not far enough
ReactiveTest.OnNext(5, new System.Drawing.Point(0,40)) //Breaches the fence
);
var observer = scheduler.CreateObserver<Unit>();
source
.Scan(Tuple.Create(zero, zero), (acc, cur) =>
{
if (DistanceBetween(acc.Item1, cur) >= fenceDistance)
{
return Tuple.Create(cur, cur);
}
else
{
return Tuple.Create(acc.Item1, cur);
}
})
.Where(pair => pair.Item1 == pair.Item2)
.Select(pair => Unit.Default)
.Subscribe(observer);
scheduler.Start();
ReactiveAssert.AreElementsEqual(new[] {
ReactiveTest.OnNext(1, Unit.Default),
ReactiveTest.OnNext(3, Unit.Default),
ReactiveTest.OnNext(5, Unit.Default)
},observer.Messages);
}
// Define other methods and classes here
public static double DistanceBetween(System.Drawing.Point a, System.Drawing.Point b)
{
var xDelta = a.X -b.X;
var yDelta = a.Y - b.Y;
var distanceSqr = (xDelta * xDelta) + (yDelta * yDelta);
return Math.Sqrt(distanceSqr);
}