从中间主题将 KStream 与 KTable 连接会导致异常



我正在尝试加入带有KTable的KStream。如果没有连接,我从中间主题"按 id 的书籍属性"阅读没有问题。

KTable 的示例味精:

{key: {id: 1}
 value: {id: 1, attribute_name: "weight"}}

KStream 的示例味精:

{key: {id: 1},
 value: {id: 1, book_id: 1, attribute_id: 1, value: 200}}

"最终聚合"主题的所需输出:

{key: {id: 1}, 
 value: {book_id: 1, attribute_name: "weight", value: 200}}
{key: {id: 1}, 
 value: {book_id: 1, attribute_name: "number_of_pages", value: 450}}

这是代码

    KStream<DefaultId, BookAttribute> bookAttributeStream = builder.stream(bookAttributeTopic, Consumed.with(defaultIdSerde, bookAttributeSerde));
    KStream<DefaultId, BookValueInt> bookValueIntStream = builder.stream(bookValueIntTopic, Consumed.with(defaultIdSerde, bookValueIntSerde));
    bookAttributeStream
        .selectKey((k, v) -> k.getId())
        .to("book-attribute-by-id", Produced.with(Serdes.Integer(), bookAttributeSerde));
    KTable<Integer, BookAttribute> bookAttributeByIdTable = builder.table("book-attribute-by-id", Consumed.with(Serdes.Integer(), bookAttributeSerde));
    // when the snippet below is commented out, consuming "book-attribute-by-id" works. 
    bookValueIntStream
        .selectKey((k, v) -> v.getAttribute_id())
        .join(bookAttributeByIdTable, (intValue, attribute) -> {
                System.out.println("intValue: " + intValue);
                System.out.println("attribute: " + attribute);
                return new BookAttributeValue(intValue, attribute);
            });

加入 KStream & KTable 时的例外情况:

线程"xxx-StreamThread-1"中的异常 org.apache.kafka.streams.errors.TopologyBuilderException: Invalid 拓扑构建:流线程 [xxx-流线程-1]找不到主题: 按 ID 预订属性 org.apache.kafka.streams.processor.internals.StreamPartitionAssignor$CopartitionedTopicsValidator.validate(StreamPartitionAssignor.java:792(

>我假设你正在使用kafka-streams 1.0.0

问题是您必须为流创建输入主题。

在您的案例中,主题是:book-attribute-by-id和变量值:bookAttributeTopicbookValueIntTopic

对于连接,Kafka Streams 必须确保连接主题中的分区数相等。当它尝试获取主题的元数据时,会引发异常:book-attribute-by-id

在运行应用程序之前,您必须手动创建book-attribute-by-id主题

在较新版本的 kafka 流中,在验证分区数之前会检查主题的存在。

最新更新