使用Rx与工人配对工作



作为Rx的新手,我试图实现一些概念上看似简单的东西,但我已经挣扎了一天,试图找出如何在Rx中实现它。

我有两个可观察器,一个是IObservable<Worker>,另一个是IObservable<Work>。这个想法是一种工作均衡器设置,通过一个可观察对象提交工作,并与在另一个可观测对象上提交的工作人员配对。

使用Observable.Zip可以很容易地实现简单的情况。但是,还有一项额外的要求正在进行中。

每个工人只能在有限的时间内接受工作(有一个窗口关闭,工人不再有资格接受工作,不应与工作项配对)。

这是我试图实现的目标的大理石图:

-----A------B-----C-----D----->(工人IOobservable<IOobservable<工人>>)||||||||-----------]|----]||||---------]||-----]--------1----------------2-3----->(工作I可观察<工作>)--------A1--------C2--D3-->(工人/工作对I可观察<成对工作>)

综上所述,要求:

  1. 当工作进入时,应将其分配给第一个在工作进入时窗口仍处于活动状态的工人。(在图中,我们有A1对,但没有B2,因为工人B在工作项2进入时已过期,C2成为正确的配对)

  2. 当一个工人与一个工作项配对时,它不应该有资格与后续的工作项配对(从某种意义上说,无论何时使用,它的窗口都应该强制关闭,但这是我最难实现的部分)。

  3. 如果工作进入,并且没有打开窗口的可用工作者,那么它应该等到下一个工作者被发出并与之配对(参见图中的D3)。

我很确定这在JoinGroupJoin中是可以实现的,但我很难弄清楚如何获得我想要的确切语义。如有任何帮助,我们将不胜感激。

对于这个任务,由于不确定的窗口和交叉求值,我对join不够信任。相反,我会为此编写一个自定义的zip运算符,其中第一个序列的值用一个时间限制值标记,zip算法只是丢弃第一个源的旧值。

下面是一个示例类ZipUntil和C#中的可运行程序。

它是无锁的,使用了Advanced RxJava中的概念,尽管我不明白同步取消在Rx.NET中是如何工作的(因为相同的模式在RxJava上不起作用)。

我怀疑这是最优的,但我认为它会起作用。

public static IObservable<Tuple<Worker, Work>> PairWorkers(IObservable<IObservable<Worker>> workerObservable, IObservable<Work> workObservable)
{
    var joined = workerObservable.Join(workObservable,
        worker => worker,
        work => Observable<Work>.Never(),
        (worker, work) => Tuple.Create(worker.FirstAsync(), work)
    );
    var distinct = joined.Distinct(t => t.Item2);
    return distinct;
}

相关内容

  • 没有找到相关文章

最新更新