我有一个Topology,它从topicA
读取事件,并进行一些处理,在此过程中,它使用System.currentTimeMillis
将其中一个字段设置为当前时间戳,并将结果发送到topicB
。此应用程序已将WallclockTimestampExtractor
设置为默认值。
我设置了另一个从两个主题读取的拓扑,以测量每个事件类型的延迟,但是,我从processorContext.timestamp
获得的时间戳在我在有效负载上设置的处理时间戳之前。据我所知,WallclockTimestampExtractor将为事件设置时间戳,该时间戳将等于将事件放入主题中的时间,因此不可能将该时间提前到处理时间。
我错过了什么?
在主题上设置message.timestamp.type=LogAppendTime
为我解决了问题