C#反应式扩展(Rx)-如何组合流和存储状态变量以生成自定义输出



我需要一些帮助来编写两个执行类似任务的IObservable(下面描述为"正常触发器"one_answers"驻留触发器")。我有工作输入流,但我不知道如何将它们组合起来,执行分析(例如,保留"停留触发器"的状态变量,以存储第一个感兴趣的coord以及我们的进度百分比),然后产生不同类型的新输出,所有这些都在可观察的范围内进行了描述。让我描述一下我试图写的内容:

正常触发器:

INPUT=两个流:1.点(x,y坐标)的不可靠热流,例如鼠标位置。每秒的点数可能略有波动,或者流可能在一段时间内不产生值。2.布尔值的热门流。

输出:数据的组合(来自流1的一个点和来自流2的一个值),它将是一个双值,代表一个百分比(仅为100%或0%)。只有满足这些条件,才会产生输出;在触发流上接收信号,并且coords流上的最后一个信号在配置的时间跨度内。另一种说法是,我们收到了一个触发器,并有一个不过时的coords。

Coords stream: -x-x-x-x-x---x-x-x-x------x-x-x-x-x-x-x-
Trigger stream:----------x------------x-------x--------
Result stream: ----------x--------------------x--------

停留触发器:

INPUT=一个流:1.点(x,y坐标)的不可靠热流,例如鼠标位置。每秒的点数可能略有波动,或者流可能在一段时间内不产生值。

输出:与"Normal Trigger"中的输出类型相同(流1中的一个点和一个新的派生值)。只有满足这些条件,才会产生输出;coords流在一定的时间跨度内提供了coords,其中coords都在一个小区域内,并且值之间没有大的间隙,例如,我在0.1秒内收到了一致的、未中断的coords,这些coords都与同一小区域有关,并且任何coords之间都没有超过0.01秒的间隙。然后,我希望产生一个输出,其中包括来自未断链的第一个coord和一个double,表示我们离目标还有10%的距离。如果我们在同一区域再收到0.1秒的coord,并且没有太大的差距,那么我希望从完整的链中输出相同的第一个coord,再加一个double,以表明我们离目标有20%的距离。如果遇到大的缺口,或者坐标从初始区域移开,那么我们报告0%,放弃我们的进度和初始坐标,继续寻找足够长(0.1秒)的连续链。如果我们达到了100%的目标,那么这就是输出,并重新开始寻找(从0%开始)。

Coords stream: -x-x-x-x-x---x-x-x-x------x-x-x-x-x-x-x-
Result stream: ---------x------------------------x-----

这可能很难理解,所以让我解释一下这些是为了什么。坐标是屏幕上的位置,第一个"正常触发器"正试图在按下按钮时捕捉位置(按下按钮即为触发器)。在第二种情况("停留触发器")中,没有按钮按下,因此我们希望捕捉鼠标指针停留在同一小区域("井")时的位置。然而,这是渐进的,所以我们想识别鼠标位置何时在一小段时间内保持一致,记录它在相同位置所需时间的10%,然后如果它在相同区域中,记录我们在相同区域的20%,等等,直到我们对鼠标在同一区域保持足够长的时间以输出100%感到高兴,这意味着用户已经在同一位置"停留"了足够长的时间来登记他们的兴趣。然后,我们开始再次寻找,看看他们再次住在哪里,可能在同一个地方。

"停留触发器"对我来说似乎更具挑战性,因为我认为你需要完成以下所有操作;

  • 保留最后一个坐标,以便能够检查当前坐标和最后一个coord之间的时间(以检测任何"大间隙")
  • 保留任何未断链中的第一个coord,这样我们就可以a) 将当前coord位置与链中第一个coord的位置进行比较,以检查当前coord是否在此链中的第一个cood的可接受距离内b) 根据我们可以接受的第一个coord和当前coord之间经过的时间来计算我们的进度c) 输出带有我们新进度百分比的第一个coord
  • 在流上有某种间隔,如果在流中检测到"大间隙",则允许我们输出0%

对于coord和触发器流,我有工作的Rx可观察性。伪代码/我应该在哪里寻找的描述将不胜感激。

感谢您阅读我的博文!Julius

解决方案:

"正常触发器">

根据Christopher关于使用Observable.Create的建议,我提出了以下建议。我不知道这是否被认为是创建流的"黑客"方式,最好用另一组Rx方法来描述我正在做的事情,但这是有效的。请注意,功能与我所描述的略有不同;

  1. 我决定总是输出触发器会更好,但如果该点足够新鲜,它会伴随一个点,即我们总是可以看到用户何时点击了鼠标按钮(或触发信号是什么),但只有在它没有过时的情况下,我们才能看到与此相关的点
  2. 输出信号不是100%或0%,而是1(表示100%)和-1(表示-100%,当遇到相反的触发时发生)。例如,按下鼠标按钮DOWN可能是1,但释放它可能是-1。

    public static IObservable<TriggerSignalWithPoint> CombineWithPointIfRelevent(
    this IObservable<bool> triggerSource,
    IObservable<Timestamped<Point>> pointsSource,
    TimeSpan pointsMaxReleventAge)
    {
    return Observable.Create<TriggerSignalWithPoint>(subj =>
    {
    bool disposed = false;
    Timestamped<Point>? latestPoint = null;
    Action disposeChildSubscriptions = null;
    var pointsSubscription = pointsSource.Subscribe(
    timestampedPoint =>
    {
    latestPoint = timestampedPoint;
    },
    ex =>
    {
    subj.OnError(ex);
    disposeChildSubscriptions();
    });
    var triggerSubscription = triggerSource
    .Where(_ => disposed == false)
    .Subscribe(
    b =>
    {
    Point? latestUsefulPoint =
    latestPoint.HasValue &&
    DateTimeOffset.Now.Subtract(latestPoint.Value.Timestamp) <= pointsMaxReleventAge
    ? latestPoint.Value.Value
    : (Point?) null;
    float signal = b ? 1 : -1;
    subj.OnNext(new TriggerSignalWithPoint(signal, latestUsefulPoint));
    },
    ex =>
    {
    subj.OnError(ex);
    disposeChildSubscriptions();
    },
    () =>
    {
    subj.OnCompleted();
    disposeChildSubscriptions();
    });
    disposeChildSubscriptions = () =>
    {
    disposed = true;
    if (triggerSubscription != null)
    {
    triggerSubscription.Dispose();
    triggerSubscription = null;
    }
    if (pointsSubscription != null)
    {
    pointsSubscription.Dispose();
    pointsSubscription = null;
    }
    };
    return disposeChildSubscriptions;
    });
    }
    

N.B.此解决方案改编自Paul Bett在此处的回答;CombineLatest,但仅推动左侧

任何批评或帮助都会非常有帮助,因为我只是在刷Rx的表面。

我有几个问题。

  1. 居住时间百分比的通知是每100毫秒发生一次,还是在该区域100毫秒后的第一次光标移动事件上
  2. 第一个光标移动事件是否符合0%的条件,因此,如果100ms后发生,保留在该区域中的第二个光标移动活动是否会导致10%/0.1d?还是第一个光标事件以某种方式跳到了10%

我对上述问题做了一些假设,并假设一个区域中至少需要两个事件(我称之为围栏)才能生成百分比。我还假设我们只会对光标移动事件做出反应,并对它们进行时间戳以进行任何时间分析。

话虽如此,我已经拼凑了一组Rx单元测试和一个模型,这些测试和模型对于构建解决方案可能很有用。

首先,我从具有X/Y属性的经典Point类开始

public class Point
{
public int X { get; set; }
public int Y { get; set; }
}

然后我创建了一个模型,希望它能封装我们试图解决的问题。我不确定这是否是这个类别的最佳名称

class Dweller
{
private static readonly TimeSpan BigGapPeriod = TimeSpan.FromSeconds(0.5);
private readonly Point _startLocation;
private readonly DateTimeOffset _startTime;
private readonly DateTimeOffset _currentTime;
private readonly TimeSpan _durationInFence;
private static readonly TimeSpan CompleteTime = TimeSpan.FromSeconds(1);
public Dweller()
: this(new Point(), DateTimeOffset.MinValue, DateTimeOffset.MinValue)
{ }
private Dweller(Point startLocation, DateTimeOffset startTime, DateTimeOffset currentTime)
{
_startLocation = startLocation;
_startTime = startTime;
_currentTime = currentTime;
_durationInFence = currentTime - _startTime;
}
public TimeSpan DurationInFence
{
get { return _durationInFence; }
}
public double Percentage
{
get { return RoundDown(Math.Min(_durationInFence.Ticks / (double)CompleteTime.Ticks, 1.0), 1); }
}
public Dweller CreateNext(Point location, DateTimeOffset now)
{
if (IsInitialValue() || !IsWithinFence(location) || HasCompleted() || IsNewSequence(now))
{
return new Dweller(location, now, now);
}
return new Dweller(_startLocation, _startTime, now);
}
private bool IsNewSequence(DateTimeOffset now)
{
return now > (_currentTime + BigGapPeriod);
}
private bool HasCompleted()
{
return Percentage == 1.0;
}
private bool IsInitialValue()
{
return _startTime == DateTimeOffset.MinValue;
}
private bool IsWithinFence(Point point)
{
//Put your own logic here
return Math.Abs(point.X - _startLocation.X) < 100
&& Math.Abs(point.Y - _startLocation.Y) < 100;
}
private static double RoundDown(double i, double decimalPlaces)
{
var power = Math.Pow(10, decimalPlaces);
return Math.Floor(i * power) / power;
}
}

我通过这些单元测试建立了这个。请注意,我使用TestScheduler来伪造时间。这意味着测试可以非常快速地运行,并且我不需要任何Thread.Sleep或WaitHandle原语。

[TestFixture]
public class DwellTriggerTests : ReactiveTest
{
//Need a predicate to break the fence. Maybe we actually want to know how long we have been within the fence?
//  Scan-> home cord, start time, current duration
//
public IObservable<double> Query(IObservable<Point> coords, IScheduler scheduler)
{
return coords.Scan(
new Dweller(),
(acc, cur) => acc.CreateNext(cur, scheduler.Now))
.Select(dweller => dweller.Percentage)
.DistinctUntilChanged()
.Where(percentage => percentage > 0.0);
}
[Test]
public void Trigger_10Percent_after_100ms_of_mouse_position_within_fence()
{
//Assuming the fence is fixed from the first position, and isn't constantly reevaluated for each new position
var testScheduler = new TestScheduler();
var observer = testScheduler.CreateObserver<double>();
var coords = testScheduler.CreateColdObservable(
OnNext(020.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(040.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(060.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(080.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(100.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(120.Milliseconds(), new Point { X = 100, Y = 100 })
);
Query(coords, testScheduler).Subscribe(observer);
testScheduler.Start();
observer.Messages.AssertEqual(
OnNext(120.Milliseconds(), 0.1)
);
}
[Test]
public void Trigger_20Percent_after_200ms_of_mouse_position_within_fence()
{
//Assuming the fence is fixed from the first position, and isn't constantly reevaluated for each new position
var testScheduler = new TestScheduler();
var observer = testScheduler.CreateObserver<double>();
var coords = testScheduler.CreateColdObservable(
OnNext(020.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(120.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(220.Milliseconds(), new Point { X = 100, Y = 100 })
);
Query(coords, testScheduler).Subscribe(observer);
testScheduler.Start();
observer.Messages.AssertEqual(
OnNext(120.Milliseconds(), 0.1),
OnNext(220.Milliseconds(), 0.2)
);
}
[Test]
public void Trigger_100Percent_after_1000ms_of_mouse_position_within_fence()
{
//Assuming the fence is fixed from the first position, and isn't constantly reevaluated for each new position
var testScheduler = new TestScheduler();
var observer = testScheduler.CreateObserver<double>();
var coords = testScheduler.CreateColdObservable(
OnNext(020.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(220.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(420.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(620.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(820.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(1020.Milliseconds(), new Point { X = 100, Y = 100 })
);
Query(coords, testScheduler).Subscribe(observer);
testScheduler.Start();
observer.Messages.AssertEqual(
OnNext(220.Milliseconds(), 0.2),
OnNext(420.Milliseconds(), 0.4),
OnNext(620.Milliseconds(), 0.6),
OnNext(820.Milliseconds(), 0.8),
OnNext(1020.Milliseconds(), 1.0)
);
}
[Test]
public void Reset_after_sequence_hits_100Percent()
{
//Assuming the fence is fixed from the first position, and isn't constantly reevaluated for each new position
var testScheduler = new TestScheduler();
var observer = testScheduler.CreateObserver<double>();
var coords = testScheduler.CreateColdObservable(
OnNext(020.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(220.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(420.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(620.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(820.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(1020.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(1120.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(1220.Milliseconds(), new Point { X = 100, Y = 100 })
);
Query(coords, testScheduler).Subscribe(observer);
testScheduler.Start();
observer.Messages.AssertEqual(
OnNext(220.Milliseconds(), 0.2),
OnNext(420.Milliseconds(), 0.4),
OnNext(620.Milliseconds(), 0.6),
OnNext(820.Milliseconds(), 0.8),
OnNext(1020.Milliseconds(), 1.0),
OnNext(1220.Milliseconds(), 0.1)
);
}
[Test]
public void Reset_if_period_of_500ms_of_silence_occurs()
{
//Assuming the fence is fixed from the first position, and isn't constantly reevaluated for each new position
var testScheduler = new TestScheduler();
var observer = testScheduler.CreateObserver<double>();
var coords = testScheduler.CreateColdObservable(
OnNext(020.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(120.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(621.Milliseconds(), new Point { X = 100, Y = 100 }),
OnNext(721.Milliseconds(), new Point { X = 100, Y = 100 })
);
Query(coords, testScheduler).Subscribe(observer);
testScheduler.Start();
observer.Messages.AssertEqual(
OnNext(120.Milliseconds(), 0.1),
OnNext(721.Milliseconds(), 0.1)
);
}
}
public static class TestExtentions
{
public static long Milliseconds(this int input)
{
return TimeSpan.FromMilliseconds(input).Ticks;
}
public static long Seconds(this int input)
{
return TimeSpan.FromSeconds(input).Ticks;
}
}

如果有帮助,请使用语句添加这些

using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using Microsoft.Reactive.Testing;
using NUnit.Framework;

这应该适用于正常触发器:

public static IObservable<double> NormalTrigger<T>(this IObservable<T> source, IObservable<bool> trigger, TimeSpan window)
{
return source.Select(s =>
trigger
.Take(window)
.Take(1)
.Select(t => t ? 100.0 : 0.0))
.Switch();  
}

编辑:测试代码,按预期工作

var trigger = new Subject<bool>();
coords.NormalTrigger(trigger, TimeSpan.FromSeconds(1)).Subscribe(Console.WriteLine);
coords.OnNext(new Point(0, 0));
System.Threading.Thread.Sleep(1100);
trigger.OnNext(true);  // Shouldn't trigger
coords.OnNext(new Point(1.01, 0));
System.Threading.Thread.Sleep(50);
coords.OnNext(new Point(1.02, 1));
System.Threading.Thread.Sleep(50);
coords.OnNext(new Point(1.03, 1));
System.Threading.Thread.Sleep(50);
trigger.OnNext(false);  // Should trigger 0
coords.OnNext(new Point(0.5, 0.5));
System.Threading.Thread.Sleep(50);
trigger.OnNext(true);  // Should trigger 100
coords.OnNext(new Point(1.04, 1));
System.Threading.Thread.Sleep(50);
coords.OnNext(new Point(2.05, 40));
System.Threading.Thread.Sleep(50);
coords.OnNext(new Point(0.06, 0));
System.Threading.Thread.Sleep(50);
coords.OnNext(new Point(1.07, 0));
System.Threading.Thread.Sleep(50);
coords.OnNext(new Point(0.08, 0));
System.Threading.Thread.Sleep(1100);
trigger.OnNext(true);
Console.ReadLine();

最新更新