我目前正在尝试升级应用于数据流的方法assignTimestampsAndWatermarks
。数据流看起来像这样:
DataStream<Auction> auctions = env.addSource(new AuctionSourceFunction(auctionSrcRates))
.name("Custom Source")
.setParallelism(params.getInt("p-auction-source", 1))
.assignTimestampsAndWatermarks(new AuctionTimestampAssigner());
AssignerWithPeriodicWatermark如下所示:
private static final class AuctionTimestampAssigner implements AssignerWithPeriodicWatermarks<Auction> {
private long maxTimestamp = Long.MIN_VALUE;
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(maxTimestamp);
}
@Override
public long extractTimestamp(Auction element, long previousElementTimestamp) {
maxTimestamp = Math.max(maxTimestamp, element.dateTime);
return element.dateTime;
}
}
要从不推荐的调用升级到当前的最佳实践,我需要采取哪些步骤?谢谢
您的水印生成器假设事件按时间戳排列有序,或者至少接受任何无序事件都会延迟。这相当于
assignTimestampsAndWatermarks(
WatermarkStrategy
.<Auction>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.dateTime))