我正在尝试使用KStream KTable leftJoin来用主题B丰富主题A中的项目。主题A是我的KStream,主题B是我的KTtable,它有大约2300万条记录。这两个主题的键都没有进行数学运算,所以我必须使用reducer将KStream(主题B(转换为KTable。
这是我的代码:
KTable<String, String> ktable = streamsBuilder
.stream("TopicB", Consumed.withTimestampExtractor(new customTimestampsExtractor()))
.filter((key, value) -> {...})
.transform(new KeyTransformer()) // generate new key
.groupByKey()
.reduce((aggValue, newValue) -> {...});
streamBuilder
.stream("TopicA")
.filter((key, value) -> {...})
.transform(...)
.leftJoin(ktable, new ValueJoiner({...}))
.transform(...)
.to("result")
1( KTable初始化是缓慢的。(大约2000 msg/s(,这正常吗?我的主题是只有一个分区。有什么方法可以提高性能吗?我试图设置以下内容来减少写吞吐量,但似乎没有太大的改善。
CACHE_MAX_BYTES_BUFFERING_CONFIG = 10 * 1024 * 1024
COMMIT_INTERVAL_MS_CONFIG = 15 * 1000
2( 当KTable未从主题B加载完毕时,将发生联接。这是发生联接时的偏移量(CURRENT-offset/LOG-END-offset(
Topic A: 32725/32726 (Lag 1)
Topic B: 1818686/23190390 (Lag 21371704)
我检查了失败的主题A的记录的时间戳,它是4天前的记录,而处理的主题B的最后一条记录是6天前。据我所知,kstream处理记录是基于时间戳的,我不明白为什么在我的情况下,KStrea(主题A(没有等到KTable(主题B(完全加载到4天前才触发加入。
我还尝试过设置时间戳提取器返回0,但效果不太好。
更新:当将时间戳设置为0时,我收到以下错误:
Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerID are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception.
我还尝试将max.task.idle.ms设置为>0(3秒30分钟(,但仍然出现同样的错误。
更新:我通过将customTimestampsExtractor设置为6天前修复了"UnknownProducterIdException"错误,该时间仍然早于主题A中的记录。我认为(不确定(设置为0会触发导致此错误的更改日志上的保留期。然而,在ktable完成加载之前,联接仍然不起作用。为什么?
我使用的是Kafka Streams 2.3.0。
我在这里做错什么了吗?非常感谢。
1.KTable初始化速度较慢。(大约2000 msg/s(,这正常吗?
这取决于您的网络,我认为限制是TopicB的消耗率,您使用的两个配置CACHE_MAX_BYTES_BUFFERING_CONFIG
和COMMIT_INTERVAL_MS_CONFIG
是为了在您想要生成多少KTable输出(因为KTable变更日志是修订流(和您将KTable更新到底层主题和下游处理器时接受多少延迟之间进行权衡。详细查看状态存储的Kafka Streams缓存配置和本博客部分Tables, Not Triggers
。
我认为提高TopicB使用率的好方法是添加更多的分区。
KStream.leftJoin(KTable,...)
总是表查找,它总是将当前流记录与KTable上最新更新的记录连接起来,在决定是否加入时不会考虑流时间。如果您想在加入时考虑流时间,请查看KStream KStream加入
在您的情况下,此滞后是TopicB
的滞后,并不意味着KTable未完全加载。当您的KTable处于状态恢复过程中时,当它从KTable的底层更改日志主题中读取时,它没有完全加载,以便在实际运行流应用程序之前恢复当前状态,以防您无法执行联接,因为在状态完全恢复之前,流应用程序不会运行。