我希望标题足够清楚。我在玩Rx的想法,遇到了一个问题,我不知道如何解决。
我有一个原始形式的事件输入,比如{ 1, 2, 3, 4, 5, 1, 2, 3, 4 }
被转换成某种新形式的输入,比如IType
。假设每当观察到数字2
时,计时器开始滴答并插入一个新的转换值(或者,说,数字6
)到流,除非遇到数字5
,在这种情况下,计时器被取消。
我正在考虑构建一个从各种来源获取输入的状态机。如果我也能将"定时"事件作为流的一部分就好了。
代码如下,但基本上我目前坚持使用Observable的一些想法。生成(即使其看起来像Observable.Timer
,但返回/插入一个新的事件到输入序列)和/或可能像Observable.TakeUntil
, Observable.Create
和合并流,但我不确定是否有更好的方法,甚至如何在代码方面实现这一点。
在代码方面可以是这样的
using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace RxTesting
{
public interface IType
{
int OriginalInput { get; set; }
}
public class SomeEvent: IType
{
public int OriginalInput { get; set; }
}
public class TimeEvent: IType
{
public int OriginalInput { get; set; }
}
class Program
{
static void Main(string[] args)
{
//Here we should have one TimerEvent after the last "2" in the sequence.
var input = new Subject<int>();
input.Select(i => new SomeEvent { OriginalInput = i }).SomeTimerOperatorHere().Subscribe(Console.WriteLine);
input.OnNext(1);
input.OnNext(2);
input.OnNext(3);
input.OnNext(4);
input.OnNext(5);
input.OnNext(1);
input.OnNext(2);
input.OnNext(3);
input.OnNext(4);
input.Dispose();
Console.ReadKey();
}
}
}
在控制台中应该打印一个RxTesting.TimerEvent
,因为有一个2
,之后没有数字5
取消其效果。
这应该可以了-我认为你的TakeUntil
想法是正确的。
我们正在过滤事件,只包括那些OriginalInput
为2的事件,并将每个事件投射到一个值为6的单值延迟流,如果OriginalInput
为5的事件到达,该流将被切断。生成的流的流被SelectMany
平铺。
public static IObservable<IType> SomeTimerOperatorHere(
this IObservable<SomeEvent> source)
{
var delay = TimeSpan.FromSeconds(1);
return Observable.Create<IType>(o => {
var sourcePub = source.Publish().RefCount();
return sourcePub
.Where(x => x.OriginalInput == 2)
.SelectMany(type =>
Observable.Return(new TimeEvent{ OriginalInput = 6 })
.Delay(delay)
.TakeUntil(sourcePub.Where(x => x.OriginalInput == 5))
).Subscribe(o);
});
}