假设有两个可观察量o1, o2
。第一个从内部进程接收事件(在很长的计算完成后),第二个通过 REST 端点接收外部事件(表示另一个外部组件也已完成)。事件数据只是一个 ID。
现在我想设计一个工作流,以便只有当两个可观察量中都存在 ID 时,才会发出新事件(即当内部和外部计算完成时)。
让在某个时间点o1
包含 ID{1,2,3}
,然后我想区分这些情况:
-
正常情况:例如 ID
2
到达o2
。两个 ID 现在都存在于两个可观察量中,输出"成功:2"> -
过期情况:内部计算完成后一段时间,外部事件尚未到达。 例如,ID
2
存在于o1
中,但即使在一小时后也不存在于o2
中,输出:">已过期:2"> -
未知情况:ID (例如 4)通过 REST 端点到达
o2
,该端点在o1
中不存在,可能是因为 ID 已经过期或仅仅是因为外部组件故障,输出:">未知:3">
我找到了groupJoin
运算符,它可能可以做我想做的,这里甚至是一个属性匹配的示例: 组联接 - 通过其中一个属性连接两个流匹配
但是,此示例似乎每次有新事件到达时都会对所有元素执行耗尽(线性时间)扫描。我认为可以滚动我自己的版本,以恒定时间检查地图,但是:我想知道是否有规范的方法,甚至是开箱即用的功能(因为我想这是一个非常常见的用例)。
(由于我是 Rx 的新手,实现此类连接操作的过期情况的最佳方法是什么)
我会通过在外部对象中具有中间状态来做到这一点:
public class ItemJoinCache<T> {
private Map<Integer, T> items;
public Observable<T> ingestInternal(T item) {
// an internal item arrived, do the necessary work
}
public Observable<T> ingestExternal(T item) {
// an external item arrived, do the necessary work
}
}
externalRestCallThatReturnsObservable()
.flatMap(myItemJoinCache::ingestExternal)
...
internalProcessThatTakesALongTime()
.flatMap(myItemJoinCache::ingestInternal)
...
这样,您可以执行可能需要的任何处理。
你也标记了这个问题 rx.net,所以我会假设用 C# 给出答案。我不确定这如何转化为Java,如果这就是你要找的。
Rx的Join
和GroupJoin
并不是真的为了这个:它们应该根据时间窗口加入。您希望通过 ID 加入。
对 Rx 友好的解决方案将是功能性的。由于你需要一些状态,所以我们可以使用一个不可变的状态,嵌入到Scan
函数中。在 C# 中,有来自 Nuget 包System.Collections.Immutable
ImmutableDictionary<TKey, TItem>
。我不确定Java中是否有等效的。
给定这些类:
public class CustomEvent
{
public int Id { get; set; }
}
public class Result
{
public ResultType Type { get; set; }
public int Id { get; set; }
}
public enum ResultType
{
Success,
Unknown,
Expired
}
您可以获得这样的解决方案:
IObservable<CustomEvent> o1;
IObservable<int> o2;
TimeSpan expirationTimeDelay = TimeSpan.FromHours(1);
IObservable<Result> results = Observable.Merge(
o1.SelectMany(ce => Observable.Merge(
Observable.Return(new Func<ImmutableDictionary<int, CustomEvent>, Tuple<ImmutableDictionary<int, CustomEvent>, Result, bool>>(h =>
Tuple.Create(h.Add(ce.Id, ce), default(Result), false)
)),
Observable.Return(new Func<ImmutableDictionary<int, CustomEvent>, Tuple<ImmutableDictionary<int, CustomEvent>, Result, bool>>(h =>
h.ContainsKey(ce.Id)
? Tuple.Create(h.Remove(ce.Id), new Result { Type = ResultType.Expired, Id = ce.Id}, true)
: Tuple.Create(h, default(Result), false)
))
.Delay(expirationTimeDelay)
)),
o2.Select(id => new Func<ImmutableDictionary<int, CustomEvent>, Tuple<ImmutableDictionary<int, CustomEvent>, Result, bool>>(h =>
h.ContainsKey(id)
? Tuple.Create(h.Remove(id), new Result { Type = ResultType.Success, Id = id }, true)
: Tuple.Create(h, new Result { Type = ResultType.Unknown, Id = id }, true)
))
)
.Scan(Tuple.Create(ImmutableDictionary<int, CustomEvent>.Empty, default(Result), false), (t, f) => f(t.Item1))
.Where(t => t.Item3)
.Select(t => t.Item2);
不可变字典是我们的核心状态,保存来自o1
的"实时"事件。累加器函数返回一个具有三个属性的元组:表示我们核心状态的不可变字典、结果对象和布尔值。布尔对象是一个筛选器,显示是否应传播结果对象。
Scan
的一个有趣的技巧是反转正常用法:将项目流转换为脱离状态的函数。在我们的例子中,函数的类型是 Func、Tuple、Results、Boolean>>(一个接受字典并返回包含三个值的元组的函数)。
这就是我们在这里所做的:每个o1
项弹出两个函数:一个将该项添加到不可变字典中(并且不推送结果)。另一个功能在一小时后出现,以查看事件是否尚未加入。如果加入,则不会发生任何事情。如果未加入,则会弹出过期结果。每个o2
项目都会弹出一个功能:检查该项目是否在地图中。如果存在,将弹出正常结果。如果不存在,则为未知。
如果你在Java中,并且没有容易获得的等同于ImmutableDictionary
,那么你可能可以替换一个常规的HashMap
,但你必须通过Publish
调用来防止来自多个订阅者的令人讨厌的状态问题。
您始终可以将 o1 简化为带有scan
的集合。当 o2 发出一个值时,您从 o1 中获取最新的集合,并withLatestFrom
并检查包含。timeout
可以解决过期部分。RxJs 5 中的示例:
o2
.withLatestFrom(
o1.scan((set, val) => set.add(val), new Set),
(o2Val, o1Set) => o1Set.has(o2Val) ? "Success" : "Unknown"
)
.timeoutWith(3600000, Observable.of("Expire"))
.subscribe(console.log)