我有一个自定义状态计算,表示为Set<Long>
,当我的Datastream<Set<Long>>
看到Kafka中的新事件时,它将不断更新。现在,每次更新我的状态时,我都想将更新后的状态打印到stdout。想知道如何在Flink中做到这一点吗?对所有的窗口和触发器操作都有点困惑,我不断地得到以下错误。
Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
我只想知道如何将我的聚合流Datastream<Set<Long>>
打印到stdout,或者将其写回另一个kafka主题?
下面是引发错误的代码片段。
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);
DataStream<Set<Long>> stream = bsTableEnv.toAppendStream(kafkaSourceTable, Row.class)
stream
.aggregate(new MyCustomAggregation(100))
.process(new ProcessFunction<Set<Long>, Object>() {
@Override
public void processElement(Set<Long> value, Context ctx, Collector<Object> out) throws Exception {
System.out.println(value.toString());
}
});
使用Flink保持集合的状态可能非常昂贵,因为在某些情况下,集合将频繁进行序列化和反序列化。如果可能,最好使用Flink内置的ListState和MapState类型。
这里有一个例子说明了一些事情:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1L, 2L, 3L, 4L, 3L, 2L, 1L, 0L)
.keyBy(x -> 1)
.process(new KeyedProcessFunction<Integer, Long, List<Long>> () {
private transient MapState<Long, Boolean> set;
@Override
public void open(Configuration parameters) throws Exception {
set = getRuntimeContext().getMapState(new MapStateDescriptor<>("set", Long.class, Boolean.class));
}
@Override
public void processElement(Long x, Context context, Collector<List<Long>> out) throws Exception {
if (set.contains(x)) {
System.out.println("set contains " + x);
} else {
set.put(x, true);
List<Long> list = new ArrayList<>();
Iterator<Long> iter = set.keys().iterator();
iter.forEachRemaining(list::add);
out.collect(list);
}
}
})
.print();
env.execute();
}
请注意,我想使用键控状态,但事件中没有任何可用作键的内容,所以我只是用一个常量对流进行键控。这通常不是一个好主意,因为它阻止了并行处理——但由于您将聚合为一个集合,因此不能并行处理,因此不会造成任何危害。
我将一组Longs表示为MapState对象的键。当我想输出集合时,我会将其收集为列表。当我只想打印一些用于调试的东西时,我只需要使用System.out.
当我在IDE中运行此作业时,我看到的是:
[1]
[1, 2]
[1, 2, 3]
[1, 2, 3, 4]
set contains 3
set contains 2
set contains 1
[0, 1, 2, 3, 4]
如果您希望每秒查看MapState中的内容,可以在进程函数中使用Timer。