如果没有恒定的输入记录流,那么带有宽限期和抑制的会话窗口的Kafka流似乎无法输出最终事件。
上下文:我们使用更改数据捕获(CDC(来监视对遗留数据库的更改。当用户使用UI进行更改时,数据库事务将更改1..n个表。每个SQL语句都会产生一个Kafka记录。为了创建一个用于启动昂贵流程的"触发记录",需要对这些记录进行聚合。该过程应该在提交遗留数据库中事务的一秒钟内启动。只有少数用户在使用旧应用程序,因此事务之间可能有相当长的时间。
我们有一个Kafka Stream应用程序,它使用会话窗口和400ms的非活动间隙来聚合共享同一密钥(事务ID(的传入记录,并输出触发记录。
我们有一个有效的解决方案,但只有当其他事务正在运行时,触发记录才会写入输出主题,以便生成稳定的传入记录流我们需要关闭窗口并写入触发记录,即使没有进一步的输入记录
工作代码如下:https://github.com/maxant/kafka-data-consistency/blob/714a44689fd48aa28b05b855638ac7ed50dd5ee9/partners/src/main/java/ch/maxant/kdc/partners/ThroughputTestStream.java#L65
以下是该代码的摘要:
stream.groupByKey()
.windowedBy(SessionWindows.with(Duration.ofMillis(400)).grace(Duration.ofMillis(0)))
.aggregate(...)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream((k,v) -> k.key())
.to("throughput-test-aggregated");
起初,我没有压抑,也没有宽限期。仅使用默认配置,我总是收到包含所有聚合记录的窗口的最终事件,但在400ms窗口之后,它需要长达6秒的时间,这对我们来说太长了,无法等待。
为了减少延迟并加快速度,我将CACHE_MAX_BYTES_BUFFERING_CONFIG设置为1,但这会在每次聚合后产生一个输出记录,而不仅仅是单个输出记录。
我引入了抑制(以及0ms的宽限期(,以确保只创建一个输出记录。
现在的问题是,如果新的输入记录在窗口关闭后到达(无论它们的键如何(,我只会收到一条输出记录
该测试创建了10个输入记录,所有记录都使用相同的键,间隔10ms,均在100ms内。然后它休息了3秒,让我在一组十张唱片后关掉它。我希望收到一个输出记录,但没有一个到达,除非我让测试继续运行,以创建第二组输入记录。这个问题是可以重现的。
我已经阅读了以下文章,但找不到任何描述我所看到的内容的内容,即只有在处理了其他记录(无论关键字如何(后,最终记录才会发送到输出主题。
- https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers/
- https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+能力+抑制+更新+为+KTables
- https://kafka.apache.org/10/documentation/streams/developer-guide/config-streams.html
即使不处理其他记录,我也必须更改什么才能将最终记录发送到我的输出主题
(在Linux上使用Kafka 2.4.1与客户端和服务器(
更新:我在拓扑中有一个错误,修复了
我在使用抑制时遇到了和你完全相同的问题,这是预期的行为。因为suppress只支持使用流时间而不是墙时钟时间发射缓冲记录,所以如果停止获取新记录,流时间将冻结,Suppress
将不会发射最后一个被抑制的窗口。
我使用的解决方案是使用处理器API编写一个自定义抑制(使用Transformer,这样您就可以使用DSL向下游发送支持的记录(,并将状态存储用作缓冲区,然后每当有新记录进入或经过一段时间间隔后(使用WALL_CLOCK_TIME
标点符号(,检查应该向下游处理器刷新(或发射(哪些窗口。
变压器看起来是这样的:
public class SuppressWindowTransformer implements Transformer<Windowed<String>, String, KeyValue<Windowed<String>, String>> {
private ProcessorContext context;
private Cancellable cancellable;
private KeyValueStore<Windowed<String>, String> kvStore;
@Override
public void init(ProcessorContext context) {
this.context = context;
kvStore = (KeyValueStore) context.getStateStore("suppressed_windowed_transaction");
cancellable = context.schedule(Duration.ofMillis(100), PunctuationType.WALL_CLOCK_TIME, timestamp -> flushOldWindow());
}
@Override
public KeyValue<Windowed<String>, String> transform(Windowed<String> key, String value) {
kvStore.put(key, value);//buffer (or suppress) the new in coming window
flushOldWindow();
return null;
}
private void flushOldWindow() {
//your logic to check for old windows in kvStore then flush
//forward (or unsuppressed) your suppressed records downstream using ProcessorContext.forward(key, value)
}
@Override
public void close() {
cancellable.cancel();//cancel punctuate
}
}
在您的流DSL中:
stream.groupByKey()
.windowedBy(SessionWindows.with(Duration.ofMillis(400)).grace(Duration.ofMillis(0)))
.aggregate(...)//remove suppress operator and write custom suppress using processor API
.toStream()
.transform(SuppressWindowTransformer::new, "suppressed_windowed_transaction")
.to("throughput-test-aggregated");