Observable.Zip 当要压缩的序列数在运行时之前未知时



我需要对审批流程进行建模。以前很简单。两个角色必须批准某些内容,然后我们可以继续下一步:

public class Approved
{
    public string ApproverRole;
}
var approvals = Subscribe<Approved>();
var vpOfFinance = approvals.Where(e => e.ApproverRole == "Finance VP");
var vpOfSales = approvals.Where(e => e.ApproverRole == "Sales VP");
var approvedByAll = vpOfFinance.Zip(vpOfSales, Tuple.Create);
approvedByAll.Subscribe(_ => SomeInterestingBusinessProcess());

但是现在有一个新的要求:批准某些内容所需的角色数量可能会有所不同:

public class ApprovalRequested
{
    public string[] Roles;
}
var approvalRequest = Subscribe<ApprovalRequested>().Take(1);
var approvals = Subscribe<Approved>();
var approvedByAll = ???;
approvedByAll.Subscribe(_ => SomeInterestingBusinessProcess());

我觉得我在这里错过了一些非常明显的东西......谁能指出我正确的方向?

编辑

澄清一下:审批流程以每件商品为基础。审批可以到达的顺序未定义。我们不在乎一个角色是否多次批准一个项目。

这个问题基本上可以简化为从值流创建Set,其中值可能无序或本质上是许多值。

如果 N 是集合的基数,我们可以简单地假设,在至少推送 N 种类型的值(在本例中为 roles)之前,该过程不会继续。

下面是 Zip 运算符的示例解决方案;也许这可以帮助您入门:

    public static IObservable<IList<T>> Zip<T>(this IList<IObservable<T>> observables)
    {
        return Observable.Create<IList<T>>(observer =>
        {
            List<List<T>> store = new List<List<T>>(Enumerable.Range(1, observables.Count).Select(_ => new List<T>()));
            return new CompositeDisposable(observables.Select((o, i) => 
                o.Subscribe(value =>
                {
                    lock (store)
                    {
                        store[i].Add(value);
                        if (store.All(list => list.Count > 0))
                        {
                            observer.OnNext(store.Select(list => list[0]).ToList());
                            store.ForEach(list => list.RemoveAt(0));
                        }
                    }
                }))
            );
        });
    }

测试:

        Observable.Interval(TimeSpan.FromSeconds(0.5))
                  .GroupBy(i => i % 3)
                  .Select(gr => gr.AsObservable())
                  .Buffer(3)                      
                  .SelectMany(set => set.Zip())
                  .Subscribe(v => Console.WriteLine(String.Join(",", v)));

这里的一个问题是,在形成组时,您可能会失去初始值,因此您可能希望通过将方法重写为 IObservable<IList<T>> Zip<TKey, T>(this IGroupedObservable<TKey, T> observables) 来合并它。

在当前版本的 Rx(我从 NuGet 获得)中,有一个 Zip() 版本,它接受可观察量的集合并返回集合的可观察量。有了它,你可以做这样的事情:

string[] requiredApprovals = …;
var approvedByAll = requiredApprovals
    .Select(required => approvals.Where(a => a.ApproverRole == required))
    .Zip();
approvedByAll.Subscribe(_ => SomeInterestingBusinessProcess());

但正如@Enigmativity指出的那样,只有当您可以确保每个人都以相同的顺序进行批准并且所有项目最终都将由所有必需的角色批准时,这才有效。如果没有,您将需要比Zip()更复杂的东西。

相关内容

  • 没有找到相关文章