以下代码在套接字上接收消息,通过 1 分钟滑动 10 秒的窗口对它们进行计数,并使用缓存的计数压缩输入。
处理是事件时间。我收到的消息包含我要用于处理的时间戳。
这接近训练练习:https://training.ververica.com/exercises/eventTimeJoin.html
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
// Input
SocketTextStreamFunction source = new SocketTextStreamFunction("localhost", 9092, "n", 0);
SingleOutputStreamOperator<Tuple2<String, Long>> input = env.addSource(source)
.map(x -> {
// Eg: 123;2019-11-29T16:03:44+01:00
String[] split = x.split(";");
LocalDateTime ldt = LocalDateTime.parse(split[1], DateTimeFormatter.ISO_OFFSET_DATE_TIME);
long timestamp = ldt.atZone(ZoneOffset.systemDefault()).toInstant().toEpochMilli();
return new Tuple2<>(split[0], timestamp);
});
// Assign timestamp
input = input.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Long>>(Time.milliseconds(100)) {
@Override
public long extractTimestamp(Tuple2<String, Long> element) {
return element.f1;
}
});
input.print("Received");
// Count the nb of input in the last minutes, sliding by 10s
SingleOutputStreamOperator<Tuple2<String, Integer>> count = input
.map(x -> new Tuple2<>(x.f0, 1))
.keyBy(0)
.timeWindow(Time.minutes(1), Time.seconds(10))
.sum(1);
count.print("Count");
// Connect the input and the count
SingleOutputStreamOperator inputWithCount = input
.keyBy(0)
.connect(count.keyBy(0))
.process(
new CoProcessFunction<Tuple2<String, Long>, Tuple2<String, Integer>, Tuple3<String, Long, Integer>>() {
private ValueState<Integer> countCache;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("count", Integer.class);
countCache = getRuntimeContext().getState(desc);
}
@Override
public void processElement1(Tuple2<String, Long> value, Context ctx, Collector<Tuple3<String, Long, Integer>> out) throws Exception {
Integer cached = countCache.value();
if (cached == null) {
cached = 0;
}
out.collect(new Tuple3<>(value.f0, value.f1, cached));
}
@Override
public void processElement2(Tuple2<String, Integer> value, Context ctx, Collector<Tuple3<String, Long, Integer>> out) throws Exception {
countCache.update(value.f1);
}
});
inputWithCount.print("Output");
env.execute("Test");
// I did not include the import, and I pretty-print the Map function for clarity
# Start server:
ncat -lk --broker 9092
# Check what's received:
nc localhost 9092
# I run the Flink app, and use the following command
echo "123;$(date -Iseconds)" | nc 0.0.0.0 9092 ;
echo "123;$(date -Iseconds)" | nc 0.0.0.0 9092 ;
sleep 20s ;
echo "123;$(date -Iseconds)" | nc 0.0.0.0 9092
现在,当我发送 2 行时,请等待 20 秒并发送另一行。我希望 2 个第一个输入的计数值为 0,第三个输入的计数为 2。 我对第一个期望是正确的,而不是第二个。
Received> (123,1575043933000)
Received> (123,1575043933000)
Output> (123,1575043933000,0)
Output> (123,1575043933000,0)
... # 20s later
Received> (123,1575043953000)
Output> (123,1575043953000,0)
Count> (123,2)
Count> (123,2)
我本以为在输出第 3 个元素之前会处理计数。 我是不是误会了活动时间?还是我在代码中做错了什么?
(跟进David Anderson的解释并仅提供替代解决方案,请先阅读他的帖子(。
如果您的示例接近您的真实数据(大量滞后(,则还可以选择引入某种空闲超时。对于某些用例,这也是处理空 Kafka 分区的推荐方法。
public static class BoundedOutOfOrdernessWithTimeoutTimestampExtractor
implements AssignerWithPeriodicWatermarks<FakeKafkaRecord> {
private static final long serialVersionUID = 1L;
private final long maxOutOfOrderness;
private final long idle;
private long recordTimestamp;
BoundedOutOfOrdernessWithTimeoutTimestampExtractor(Time maxOutOfOrderness, Time idle) {
this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
this.idle = idle.toMilliseconds();
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(Math.max(recordTimestamp - maxOutOfOrderness, System.currentTimeMillis() - idle));
}
@Override
public long extractTimestamp(FakeKafkaRecord record, long previousElementTimestamp) {
return recordTimestamp = record.getTimestamp();
}
}
时间戳分配器根据您的水印间隔进行查询。
env.getConfig().setAutoWatermarkInterval(100);
如果BoundedOutOfOrdernessWithTimeoutTimestampExtractor
在idle
期间没有收到事件,它将相应地提前水印。您可能希望将idle
设置为maxOutOfOrderness
(100 毫秒(。
问题是你没有做任何事情来保证在发出第三个元素之前已经处理了计数 - 事实上,它几乎肯定不会。
原因是当前水印无法充分前进以触发窗口,直到第三个事件到达。这并不重要,重要的是你已经等待了20秒的实时时间 - 重要的是没有事件通过时间戳提取器,因此没有推进水印的基础。
此外,BoundedOutOfOrdernessTimestampExtractor
是一个周期性的水印生成器,默认情况下每 200 毫秒才创建新水印。这意味着您的第三个事件很可能会在生成触发窗口的水印之前由CoProcessFunction
处理。
如果您切换到标点水印生成器,您可以获得更确定的水印 - 但水印仍将遵循第 3 个事件,因此它仍然不会产生您期望的结果。
谢谢大卫和阿维德!
直到现在我才明白的是,当事件进入系统时会生成水印(而处理时间是自动的,并且遵循服务器时钟(。 无论如何,源变得空闲,不会再发生任何事情。 这正是文档中写的内容,但我错过了。
在以下特定情况下,我得到了我期望的输出:
echo "123;$(date -Iseconds)" | nc 0.0.0.0 9092 ;
echo "123;$(date -Iseconds)" | nc 0.0.0.0 9092 ;
sleep 20s ;
echo "456;$(date -Iseconds)" | nc 0.0.0.0 9092 ;
sleep 1s ;
echo "123;$(date -Iseconds)" | nc 0.0.0.0 9092
Received> (123,1575640032000)
Received> (123,1575640032000)
Output> (123,1575640032000,0)
Output> (123,1575640032000,0)
... # 20s later
Received> (456,1575640052000)
Output> (456,1575640052000,0)
Count> (123,2)
Count> (123,2)
... # 1s later
Received> (123,1575640053000)
Output> (123,1575640053000,2)
我发现这有点违反直觉,我的输出可能会根据我是否收到其他事件而改变。在我的用例中,我希望连续输入,但我更喜欢稳定的行为。
使用您的水印功能Arvid,我得到了我想要的行为,谢谢。我仍然不确定我是否能够重播一批输入。我会这么认为,但我仍然在思考水印和事件时间。
由于它不是一个捆绑函数,这让我怀疑我是否以错误的方式使用 Flink?
作为参考,这是我最终得到的代码。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
// Input
SocketTextStreamFunction source = new SocketTextStreamFunction("localhost", 9092, "n", 0);
SingleOutputStreamOperator<Tuple2<String, Long>> input = env.addSource(source)
.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
// Eg: 123;2019-11-29T16:03:44+01:00
String[] split = value.split(";");
LocalDateTime ldt = LocalDateTime.parse(split[1], DateTimeFormatter.ISO_OFFSET_DATE_TIME);
long timestamp = ldt.atZone(ZoneOffset.systemDefault()).toInstant().toEpochMilli();
return new Tuple2<>(split[0], timestamp);
}
});
// Assign timestamp
input = input.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessWithTimeoutTimestampExtractor(Time.milliseconds(10), Time.milliseconds(10)));
input.print("Received");
// Count the nb of input in the last minutes, sliding by 10s
SingleOutputStreamOperator<Tuple2<String, Integer>> count = input
.map(new MapFunction<Tuple2<String, Long>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(Tuple2<String, Long> x) throws Exception {
return new Tuple2<>(x.f0, 1);
}
})
.keyBy(0)
.timeWindow(Time.minutes(1), Time.seconds(10))
.sum(1);
count.print("Count");
// Connect the input and the count
SingleOutputStreamOperator<Tuple3<String, Long, Integer>> inputWithCount = input
.keyBy(0)
.connect(count.keyBy(0))
.process(
new CoProcessFunction<Tuple2<String, Long>, Tuple2<String, Integer>, Tuple3<String, Long, Integer>>() {
private ValueState<Integer> countCache;
private long previousCountTimestamp;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("count", Integer.class);
countCache = getRuntimeContext().getState(desc);
}
@Override
public void processElement1(Tuple2<String, Long> input, Context ctx,
Collector<Tuple3<String, Long, Integer>> out) throws Exception {
Integer cached = countCache.value();
if (cached == null) {
cached = 0;
}
out.collect(new Tuple3<>(input.f0, input.f1, cached));
}
@Override
public void processElement2(Tuple2<String, Integer> count, Context ctx,
Collector<Tuple3<String, Long, Integer>> out) throws Exception {
countCache.update(count.f1);
ctx.timerService().deleteEventTimeTimer(previousCountTimestamp);
previousCountTimestamp = ctx.timestamp() + Time.minutes(1).toMilliseconds();
ctx.timerService().registerEventTimeTimer(previousCountTimestamp);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<String, Long, Integer>> out)
throws Exception {
System.out.println("Cache expires");
countCache.clear();
}
});
inputWithCount.print("Output");
env.execute("Test");
顺便说一句,我必须设置缓存过期。
使用输出:
echo "123;$(date -Iseconds)" | nc 0.0.0.0 9092 ;
echo "123;$(date -Iseconds)" | nc 0.0.0.0 9092 ;
sleep 20s ;
echo "456;$(date -Iseconds)" | nc 0.0.0.0 9092 ;
sleep 1s ;
echo "123;$(date -Iseconds)" | nc 0.0.0.0 9092
Received> (123,1575641582000)
Received> (123,1575641582000)
Output> (123,1575641582000,0)
Output> (123,1575641582000,0)
... # few s later
Count> (123,2)
... # 10s later
Count> (123,2)
... # few s later
Received> (456,1575641602000)
Output> (456,1575641602000,0)
Received> (123,1575641603000)
Output> (123,1575641603000,2)
... # few s later
Count> (123,3)
Count> (456,1)
... # 10s later
Count> (456,1)
Count> (123,3)
... # 10s later
Count> (456,1)
Count> (123,3)
... # 10s later
Count> (123,3)
Count> (456,1)
...