我有这样的拓扑结构:
Topology topology = new Topology();
//WS connection processor
topology.addSource(WS_CONNECTION_SOURCE, new StringDeserializer(), new WebSocketConnectionEventDeserializer(), KafkaTopics.WS_CONNECTION_EVENTS_TOPIC)
.addProcessor(SESSION_PROCESSOR, WSUserSessionProcessor::new, WS_CONNECTION_SOURCE)
.addStateStore(sessionStoreBuilder, SESSION_PROCESSOR)
.addSink(WS_STATUS_SINK, KafkaTopics.WS_USER_ONLINE_STATUS_TOPIC, stringSerializer, stringSerializer, SESSION_PROCESSOR)
//WS session routing
.addSource(WS_ROUTING_BY_SESSION_SOURCE, new StringDeserializer(), new StringDeserializer(),
KafkaTopics.WS_DELIVERY_TOPIC)
.addProcessor(WS_ROUTING_BY_SESSION_PROCESSOR, WSSessionRoutingProcessor::new,
WS_ROUTING_BY_SESSION_SOURCE)
.addStateStore(userConnectedNodesStoreBuilder, WS_ROUTING_BY_SESSION_PROCESSOR, SESSION_PROCESSOR)
//WS delivery
.addSource(WS_DELIVERY_SOURCE, new StringDeserializer(), new StringDeserializer(),
INSTANCE_SPECIFIC_TOPIC)
.addProcessor(WS_DELIVERY_PROCESSOR, WSDeliveryProcessor::new, WS_DELIVERY_SOURCE);
拓扑中最后提到的源是每个应用程序实例特有的主题。我希望该主题仅由该实例处理。此主题的数据由上一个处理器根据必须处理该消息的实例推送。
但一旦流启动,它就试图将特定于实例的主题分区也分配给其他实例。我们能在Kafka流中实现这一要求吗?
我希望一个主题只能由特定的实例处理。
您想要的是不可能的。对于Kafka Streams程序,同一应用程序的所有实例都需要完全相同,因此需要具有相同的输入主题。
你需要将你的应用程序分成4个应用程序:第一个应用程序执行程序的共享分区,并写入3个不同的主题。此外,您还有另外3个应用程序(具有自己的application.id
(,每个应用程序都在阅读其中一个主题。
请注意,如果需要,可以在同一JVM中运行多个KafkaStreams
客户端。