作为Rx的新手,我试图实现一些概念上看似简单的东西,但我已经挣扎了一天,试图找出如何在Rx中实现它。
我有两个可观察器,一个是IObservable<Worker>
,另一个是IObservable<Work>
。这个想法是一种工作均衡器设置,通过一个可观察对象提交工作,并与在另一个可观测对象上提交的工作人员配对。
使用Observable.Zip
可以很容易地实现简单的情况。但是,还有一项额外的要求正在进行中。
每个工人只能在有限的时间内接受工作(有一个窗口关闭,工人不再有资格接受工作,不应与工作项配对)。
这是我试图实现的目标的大理石图:
-----A------B-----C-----D----->(工人IOobservable<IOobservable<工人>>)||||||||-----------]|----]||||---------]||-----]--------1----------------2-3----->(工作I可观察<工作>)--------A1--------C2--D3-->(工人/工作对I可观察<成对工作>)
综上所述,要求:
-
当工作进入时,应将其分配给第一个在工作进入时窗口仍处于活动状态的工人。(在图中,我们有A1对,但没有B2,因为工人B在工作项2进入时已过期,C2成为正确的配对)
-
当一个工人与一个工作项配对时,它不应该有资格与后续的工作项配对(从某种意义上说,无论何时使用,它的窗口都应该强制关闭,但这是我最难实现的部分)。
-
如果工作进入,并且没有打开窗口的可用工作者,那么它应该等到下一个工作者被发出并与之配对(参见图中的D3)。
我很确定这在Join
或GroupJoin
中是可以实现的,但我很难弄清楚如何获得我想要的确切语义。如有任何帮助,我们将不胜感激。
对于这个任务,由于不确定的窗口和交叉求值,我对join
不够信任。相反,我会为此编写一个自定义的zip运算符,其中第一个序列的值用一个时间限制值标记,zip算法只是丢弃第一个源的旧值。
下面是一个示例类ZipUntil
和C#中的可运行程序。
它是无锁的,使用了Advanced RxJava中的概念,尽管我不明白同步取消在Rx.NET中是如何工作的(因为相同的模式在RxJava上不起作用)。
我怀疑这是最优的,但我认为它会起作用。
public static IObservable<Tuple<Worker, Work>> PairWorkers(IObservable<IObservable<Worker>> workerObservable, IObservable<Work> workObservable)
{
var joined = workerObservable.Join(workObservable,
worker => worker,
work => Observable<Work>.Never(),
(worker, work) => Tuple.Create(worker.FirstAsync(), work)
);
var distinct = joined.Distinct(t => t.Item2);
return distinct;
}