Kafka流:状态存储在左联接期间未初始化



我正在尝试加入两个Kafka主题。一个是KStream,另一个是KTable。左联接抱怨处理器的状态存储不存在。我确实在kafka GitHub和其他地方查看了许多代码示例,其中StateStore不是由KStream客户端代码显式创建的。请告知以下代码中缺少什么。

应用程序流保留为与users表联接,以便与应用程序和用户一起发出记录。应用程序的所有者是用户。

版本:1.1.0

感谢

public void process() {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
config.put(StreamsConfig.CLIENT_ID_CONFIG, CLIENT_ID);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Common.KAFKA_SOCKET);
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, CustomSerdes.applicationSerde);
config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class);
config.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
// User properties: userid, username
KTable<String, User> users = new StreamsBuilder().table(TOPIC_USERS,
Consumed.with(Serdes.String(), CustomSerdes.serdeFor(User.class)));
StreamsBuilder builder = new StreamsBuilder();
// Application properties: id, name
KStream<String, Application> stream = builder.stream(TOPIC_APPLICATIONS);
stream.
map((appId, app) -> KeyValue.pair(app.getOwnerId(), app.getAppId()))
.leftJoin(users, (app, user) -> "a:" + app + " u:" + user.getUserName())
.to(OUTPUT_TOPIC);
KafkaStreams streams = new KafkaStreams(builder.build(), config);
StreamsManager.startAndHandleShutdown(streams);
}

错误:

Exception in thread "main" org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore topic-users-STATE-STORE-0000000000 is not added yet.
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.connectProcessorAndStateStore(InternalTopologyBuilder.java:716)
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.connectProcessorAndStateStores(InternalTopologyBuilder.java:615)
at org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:797)
at org.apache.kafka.streams.kstream.internals.KStreamImpl.leftJoin(KStreamImpl.java:817)
at org.apache.kafka.streams.kstream.internals.KStreamImpl.leftJoin(KStreamImpl.java:805)
at com.test.streams.users.AppWithUserConsumerMain.process(AppWithUserConsumerMain.java:50)

为了能够使用联接,联接的两个部分(在您的案例中是KStreamKTable(都应该从同一个StreamsBuilder创建,因此它们将属于同一拓扑。

在您的案例中,您创建了两个StreamsBuilder,因此KStreamKTable不属于同一拓扑。

最新更新