我的用例是识别在X时间后没有实时接收到预期事件的实体,而不是使用批处理作业。例如:
如果我们在时间T收到了PaymentInitiated事件,但是在T+X没有收到PaymentFailed/PaymentAborted/paymentsucceeded,那么触发一个触发器说PaymentStuck以及PaymentInitiated事件的细节。
我如何在Apache Storm中建模这样的用例,因为它在每个事件上滚动时间周期X,而不是固定的时间间隔。
谢谢,哈瑞
对于Storm,需要使用低级Java API将所有逻辑放入UDF代码中(我怀疑Trindent是否有帮助)。我从未与Samza合作过,也无法为它提供任何帮助(或判断哪个系统更适合您的问题)。
例如,在Storm中,您可以为Spout.nextTuple()
中的每个元组分配时间戳,并按时间戳降序缓冲Bolt中未完成支付的所有元组。每次调用Bolt.execute()
时,您都可以将新元组的时间戳与队列的头部(即最老的元组)进行比较。如果输入元组的时间戳大于head- t + X,你知道你的head元组超时了,你可以为它触发触发器。
当然,您需要执行fieldsGrouping()
以确保属于同一支付的所有元组都由同一Bolt实例处理。您可能还需要按时间戳对传入的螺栓元组进行排序,或者使用更高级的超时逻辑来处理无序元组(关于增加时间戳)。
根据您的延迟需求和输入流速率,您还可以使用"滴答元组"来触发头部元组与此虚拟滴答元组的比较。或者作为更严格的实现,直接在Spout.next()
中执行所有这些逻辑(如果您知道支付的所有元组都经过相同的Spout实例)。