在以下代码中
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
WatermarkStrategy<T> watermarkStrategy) {
final WatermarkStrategy<T> cleanedStrategy = clean(watermarkStrategy);
// match parallelism to input, to have a 1:1 source -> timestamps/watermarks relationship
// and chain
final int inputParallelism = getTransformation().getParallelism();
final TimestampsAndWatermarksTransformation<T> transformation =
new TimestampsAndWatermarksTransformation<>(
"Timestamps/Watermarks",
inputParallelism,
getTransformation(),
cleanedStrategy);
getExecutionEnvironment().addOperator(transformation);
return new SingleOutputStreamOperator<>(getExecutionEnvironment(), transformation);
}
assignTimestampsAndWatermarks()
接收主流,并根据params中指定的策略分配时间戳和水印,最后返回SingleOutputStreamOperator
,CCD_2是更新后的流,生成了时间戳和标记。
我的问题是,TimestampsAndWatermarksTransformation
在这里(内部(做了什么,这条线getExecutionEnvironment().addOperator(transformation);
的效果如何。
当您在流上调用assignTimestampsAndWatermarks
时,此代码会在作业图中添加一个运算符来进行时间戳提取和水印生成。这是将事情连接起来,以便实际完成指定的水印。
内部有两种类型的转换:(1(物理转换,如map
或assignTimestampsAndWatermarks
,它会更改流记录;(2(逻辑转换,如CCD 8,它只影响拓扑。