我正在尝试对在不同线程上无序到达的事件进行重新排序。
是否可以创建与这些大理石图匹配的反应式扩展查询:
s1 1 2 3 4
s2 1 3 2 4
result 1 2 3 4
和。。。
s1 1 2 3 4
s2 4 3 2 1
result 1234
即:仅按版本号顺序发布结果。
我得到的最接近的是每次 s1 滴答时使用 Join 打开一个窗口,并且仅在 s2 到达时以相同的数字关闭它。
喜欢这个:
var publishedEvents = events.Publish().RefCount();
publishedEvents.Join(
publishedEvents.Scan(0, (i, o) => i + 1),
expectedVersion => publishedEvents.Any(@event => @event.Version == expectedVersion),
_ => Observable.Never<Unit>(),
(@event, expectedVersion) => new {@event,expectedVersion})
.Where(x => x.expectedVersion == x.@event.Version)
.Select(x => x.@event)
.Subscribe(Persist);
但这不适用于图表 2。第 2 组将在 s2 与数字 2 刻度时完成,因此在 1 之前完成。
有意义吗?可以用 Rx 完成吗?应该吗?
编辑:我想这就像重叠的窗口,在之前的所有窗口都关闭之前,后面的窗口无法关闭。在窗口编号与事件版本号匹配之前,前面的窗口不会关闭。
编辑2:
我现在有这样的东西,但它并不是我希望的响应式、功能性、线程安全的 LINQ 启示(请忽略我的事件现在是 JObjects):
var orderedEvents = Observable.Create<JObject>(observer =>
{
var nextVersionExpected = 1;
var previousEvents = new List<JObject>();
return events
.ObserveOn(Scheduler.CurrentThread)
.Subscribe(@event =>
{
previousEvents.Add(@event);
var version = (long) @event["Version"];
if (version != nextVersionExpected) return;
foreach (var previousEvent in previousEvents.OrderBy(x => (long) x["Version"]).ToList())
{
if ((long) previousEvent["Version"] != nextVersionExpected)
break;
observer.OnNext(previousEvent);
previousEvents.Remove(previousEvent);
nextVersionExpected++;
}
});
});
简介
这个问题的关键是排序。无论如何,你看它,需要某种形式的缓冲。虽然毫无疑问,一些精心设计的运算符组合可能会实现这一目标,但我认为这是一个很好的例子,Observable.Create
是一个很好的选择。
通用化解决方案
我已经做了一些努力来概括我接受任何类型的排序键的方法。为此,我希望得到:
- 用于获取类型为
Func<TSource,TKey>
的事件的键的键选择器函数 - 类型
TKey
的初始密钥 - 用于按顺序获取类型为
Func<TKey,TKey>
的下一个键的函数 - 一个结果选择器,用于从源流中的配对事件生成结果,类型为
Func<TSource,TSource,TSource>
由于我只是使用从 1 开始的整数序列进行测试,因此可以满足以下条件:
- 键选择器:
i => i
- 第一键:
1
- 下一页:
k => k+1
- 结果选择器:
(left,right) => left
排序
这是我Sort
尝试。它将事件缓冲到字典中,并尽快将它们刷新给订阅者:
public static IObservable<TSource> Sort<TSource, TKey>
(this IObservable<TSource> source,
Func<TSource, TKey> keySelector,
TKey firstKey,
Func<TKey, TKey> nextKeyFunc)
{
return Observable.Create<TSource>(o =>
{
var nextKey = firstKey;
var buffer = new Dictionary<TKey, TSource>();
return source.Subscribe(i =>
{
if (keySelector(i).Equals(nextKey))
{
nextKey = nextKeyFunc(nextKey);
o.OnNext(i);
TSource nextValue;
while (buffer.TryGetValue(nextKey, out nextValue))
{
buffer.Remove(nextKey);
o.OnNext(nextValue);
nextKey = nextKeyFunc(nextKey);
}
}
else buffer.Add(keySelector(i), i);
});
});
}
我不得不说这是一个非常幼稚的实现。在过去的生产代码中,我已经通过特定的错误处理、固定大小的缓冲区和超时来详细阐述这一点,以防止资源泄漏。但是,对于此示例,它将起作用。:)
排序后(对不起!),我们现在可以看看处理多个流。
合并结果
第一次尝试
我的第一个尝试是生成一个无序的事件流,这些事件流已被看到所需的次数。然后可以对此进行排序。我通过按键对元素进行分组来做到这一点,使用GroupByUntil
来保持每个组,直到捕获了两个元素。然后,每个组都是同一键的结果流。对于整数事件的简单示例,我可以只取每个组的最后一个元素。但是,我不喜欢这样,因为对于更真实的场景来说,每个结果流都可能贡献一些有用的东西,这很尴尬。为了感兴趣,我包含代码。请注意,为了可以在这次和我的第二次尝试之间共享测试,我接受一个未使用的 resultSelector 参数:
public static IObservable<TSource> OrderedCollect<TSource, TKey>
(this IObservable<TSource> left,
IObservable<TSource> right,
Func<TSource, TKey> keySelector,
TKey firstKey,
Func<TKey, TKey> nextKeyFunc
Func<TSource,TSource,TSource> resultSelector)
{
return left.Merge(right)
.GroupByUntil(keySelector, x => x.Take(2).LastAsync())
.SelectMany(x => x.LastAsync())
.Sort(keySelector, firstKey, nextKeyFunc);
}
旁白:您可以破解SelectMany
子句来决定如何选择结果。与第二次尝试相比,此解决方案的一个优点是,在具有许多结果流的场景中,更容易看到如何扩展它以选择三个结果元组中的前两个到达。
第二次尝试
对于这种方法,我独立地对每个流进行排序,然后将结果Zip
在一起。这不仅是一个看起来更简单的操作,而且以有趣的方式组合每个流的结果也容易得多。为了使测试与我的第一种方法兼容,我选择了 resultSelector 函数以使用第一个流的事件作为结果,但显然您可以灵活地在您的场景中执行一些有用的操作:
public static IObservable<TSource> OrderedCollect<TSource, TKey>
(this IObservable<TSource> left,
IObservable<TSource> right,
Func<TSource, TKey> keySelector,
TKey firstKey,
Func<TKey, TKey> nextKeyFunc,
Func<TSource, TSource, TSource> resultSelector)
{
return Observable.Zip(
left.Sort(keySelector, firstKey, nextKeyFunc),
right.Sort(keySelector, firstKey, nextKeyFunc),
resultSelector);
}
旁白:不难看出如何将此代码扩展到接受任意数量的输入流的更一般的情况,但如前所述,使用 Zip
使得在给定键处阻塞非常不灵活,直到所有流的结果都进入。
测试用例
最后,这是我的测试,与您的示例场景相呼应。若要运行这些,请导入 nuget 包rx-testing
和nunit
,并将上面的实现放入静态类中:
public class ReorderingEventsTests : ReactiveTest
{
[Test]
public void ReorderingTest1()
{
var scheduler = new TestScheduler();
var s1 = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(200, 2),
OnNext(400, 3),
OnNext(500, 4));
var s2 = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(200, 3),
OnNext(300, 2),
OnNext(500, 4));
var results = scheduler.CreateObserver<int>();
s1.OrderedCollect(
right: s2,
keySelector: i => i,
firstKey: 1,
nextKeyFunc: i => i + 1,
resultSelector: (left,right) => left).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(100, 1),
OnNext(300, 2),
OnNext(400, 3),
OnNext(500, 4));
}
[Test]
public void ReorderingTest2()
{
var scheduler = new TestScheduler();
var s1 = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3),
OnNext(400, 4));
var s2 = scheduler.CreateColdObservable(
OnNext(100, 4),
OnNext(200, 3),
OnNext(300, 2),
OnNext(400, 1));
var results = scheduler.CreateObserver<int>();
s1.OrderedCollect(
right: s2,
keySelector: i => i,
firstKey: 1,
nextKeyFunc: i => i + 1,
resultSelector: (left, right) => left).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(400, 1),
OnNext(400, 2),
OnNext(400, 3),
OnNext(400, 4));
}
}
咖喱以避免重复
最后评论,因为我讨厌在代码中重复自己,所以这里有一个调整,避免了我在第二种方法中重复调用Sort
的方式。我没有把它包含在正文中,以避免混淆不熟悉咖喱的读者:
public static IObservable<TSource> OrderedCollect<TSource, TKey>
(this IObservable<TSource> left,
IObservable<TSource> right,
Func<TSource, TKey> keySelector,
TKey firstKey,
Func<TKey, TKey> nextKeyFunc,
Func<TSource, TSource, TSource> resultSelector)
{
Func<IObservable<TSource>, IObservable<TSource>> curriedSort =
events => events.Sort(keySelector, firstKey, nextKeyFunc);
return Observable.Zip(
curriedSort(left),
curriedSort(right),
resultSelector);
}