我已经开始研究ReactiveX,但不知道它是否适合我试图解决的问题,因为要么我现在对ReactiveX不够了解,要么它没有我需要的东西。
比方说,我经常收到20种不同类型的信息。所有消息都应首先保存到数据库中。然后我需要进一步的分析。我对按顺序排列的A、B、C和D型很感兴趣(不必一个接一个(。当消息A出现时,应该将其视为我需要触发的流程的开始。然后我应该等待消息B(任何其他类型的消息都可以在等待时到达(到达并执行过程中的步骤。在消息B之后,我等待消息C并执行过程中的步骤。然后我等待消息D,它标志着过程的结束。然后我需要重新开始,等待消息A启动新流程。
我使用的是.NET,但来自任何平台的代码都可以确定如何(或是否(做到这一点。
更新:提供更多上下文
使用@Enigmatity示例代码,我将尝试稍微扩展一下这个问题。消息由设备产生。所以让我们假设在";A1、B2、B1、C1、F3"流的第一个字母是消息类型,数字是设备的ID。因此,消息A、B、C和D需要属于同一设备才能被视为匹配。服务器总是获取所有消息,因为设备会重复这些消息,直到得到确认。这就是单个设备可以产生的结果(流可以包含来自所有设备的混合消息(:
A1,B1,H1,F1,A1-这里的设备在完成它正在做的任何事情之前重新启动,所以首先A1,B1应该被忽略,我们现在开始重新等待A,B,C和D。
A1、B1、C1、B1——这是不可能发生的。A1总是在B、C或D之前。有时它可能不会到达D,但随后它会重新开始。
对于Rx.NET
的API跟踪RxJS
的情况,这很简单。假设我们已经有了所有信息的可观察性:
const ofType = theType => filter(({type}) => type === theType);
const a$ = messages$.pipe(ofType('a'));
const b$ = messages$.pipe(ofType('b'));
const c$ = messages$.pipe(ofType('c'));
const d$ = messages$.pipe(ofType('d'));
const handleB$ = b$.pipe(take(1), concatMap(bMsg => /* do the b step */));
const handleC$ = c$.pipe(take(1), concatMap(cMsg => /* do the c step */));
const waitForD$ = d$.pipe(take(1));
const process$ = a$.pipe(
// while we are handling this "a" message, ignore other "a" messages
exhaustMap((aMsg) => {
// these will execute sequentially. once complete, we go back to
// listening for "a" messages
return concat(handleB$, handleC$, waitForD$);
})
);
注意,CCD_;b";以及";c";步骤;d";消息如果您愿意,可以忽略或抑制此输出。
我发现Rx.NET可能缺少一个exhautMap实现。这里有一个SO问题来解决这个问题。
根据您的描述,我不确定是否可以保证您始终获得每个消息类型A、B、C和D,而不会获得另一组或重叠的值。我有两种方法,以防在最后一个D之前出现第二个a时重新启动出现问题。
这是我的基本代码设置:
var subject = new Subject<string>();
IObservable<(string a, string b, string c, string d)> query = ...
query.Subscribe(x => Console.WriteLine($"{x.a} {x.b} {x.c} {x.d}"));
"A1,B1,A2,C1,F1,D1,A3,A4,B2,B3,A5,C2,B4,F2,D2,D3,C3,D3"
.Split(',')
.ToObservable()
.Subscribe(subject);
这是一种情况,当所有东西都按顺序排列并完全匹配时(尽管中间穿插着其他类型的消息:
IObservable<(string a, string b, string c, string d)> query =
subject
.Do(x => { /* Save here */ })
.Publish(ss =>
{
var ssa = ss.Where(s => s[0] == 'A');
var ssb = ss.Where(s => s[0] == 'B');
var ssc = ss.Where(s => s[0] == 'C');
var ssd = ss.Where(s => s[0] == 'D');
return Observable.When(
ssa
.And(ssb)
.And(ssc)
.And(ssd)
.Then((a, b, c, d) => (a: a, b: b, c: c, d: d)));
});
该查询使用了Rx中非常强大但很少使用的模式/计划查询(也称为联接(。
如果你确实有问题,当消息出现故障时需要重置,并且你需要最新的消息,那么我认为这是有效的:
IObservable<(string a, string b, string c, string d)> query =
subject
.Do(x => { /* Save here */ })
.Publish(ss =>
ss
.Where(s => s[0] == 'A')
.Select(sa => ss.Where(s => s[0] == 'B').Select(sb => (a: sa, b: sb)))
.Switch()
.Select(sab => ss.Where(s => s[0] == 'C').Select(sc => (a: sab.a, b: sab.b, c: sc)))
.Switch()
.Select(sabc => ss.Where(s => s[0] == 'D').Select(sd => (a: sabc.a, b: sabc.b, c: sabc.c, d: sd)))
.Switch());
第一个查询给出的是:
A1 B1 C1 D1
A2 B2 C2 D2
A3 B3 C3 D3
一切都很好,很相配。
第二个给出的是:
A1 B1 C1 D1
A4 B3 C2 D2
A4 B3 C2 D3
A5 B4 C3 D3