为防止弃用,该代码的替代方案是什么?我甚至使用了assignTimeStamp接口以及,但我得到错误与
DataStream<Tuple2<Long, String>> sum = data.map(new MapFunction<String, Tuple2<Long, String>>()
{
public Tuple2<Long, String> map(String s)
{
String[] words = s.split(",");
return new Tuple2<Long, String>(Long.parseLong(words[0]), words[1]);
}
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<Long, String>>()
{
public long extractAscendingTimestamp(Tuple2<Long, String> t)
{
return t.f0;
}
})
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce(new ReduceFunction<Tuple2<Long, String>>()
{
public Tuple2<Long, String> reduce(Tuple2<Long, String> t1, Tuple2<Long, String> t2)
{
int num1 = Integer.parseInt(t1.f1);
int num2 = Integer.parseInt(t2.f1);
int sum = num1 + num2;
Timestamp t = new Timestamp(System.currentTimeMillis());
return new Tuple2<Long, String>(t.getTime(), "" + sum);
}
});
您需要使用WatermarkStrategy.forMonotonousTimestamps
:
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.sql.Timestamp;
public class Test {
public static void main(String[] args) {
StreamExecutionEnvironment flink = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> data = flink.fromElements("hello", "world");
data.map((MapFunction<String, Tuple2<Long, String>>) s -> {
String[] words = s.split(",");
return new Tuple2<>(Long.parseLong(words[0]), words[1]);
}
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<Long, String>>forMonotonousTimestamps()
.withTimestampAssigner((SerializableTimestampAssigner<Tuple2<Long, String>>) (element, recordTimestamp) -> element.f0))
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce((ReduceFunction<Tuple2<Long, String>>) (t1, t2) -> {
int num1 = Integer.parseInt(t1.f1);
int num2 = Integer.parseInt(t2.f1);
int sum = num1 + num2;
Timestamp t = new Timestamp(System.currentTimeMillis());
return new Tuple2<>(t.getTime(), "" + sum);
});
}
}