对于在Kubernetes中部署的应用程序实例之间查询远程状态存储,有什么建议吗?我们的应用程序实例部署有2个或多个副本。
基于文件https://kafka.apache.org/10/documentation/streams/developer-guide/interactive-queries.html#id7
streams.allMetadataForStore("word-count")
.stream()
.map(streamsMetadata -> {
// Construct the (fictituous) full endpoint URL to query the current remote application instance
String url = "http://" + streamsMetadata.host() + ":" + streamsMetadata.port() + "/word-count/alice";
// Read and return the count for 'alice', if any.
return http.getLong(url);
})
.filter(s -> s != null)
.findFirst();
streamsMetadata.host((是否会导致POD IP?如果是这样,从这个吊舱到另一个吊舱的呼叫会被允许吗?这是正确的方法吗?
streamsMetadata.host((
此方法返回通过application.server
配置参数配置的任何内容。也就是说,每个应用程序实例(在您的情况下是每个POD(都必须设置此配置,以提供其可访问性的信息(例如,其IP和端口(。Kafka Streams会将这些信息分发给所有应用程序实例。
您还需要相应地配置您的POD,以允许通过指定的端口发送/接收查询请求。这部分是您需要自己编写的附加代码,即某种"查询路由层"。Kafka Streams只内置了查询本地状态和分发关于哪个状态托管在哪里的元数据的支持;但是没有内置的remove查询支持。
查询路由层的示例实现(WordCountInteractiveQueries
(可以在Github上找到:https://github.com/confluentinc/kafka-streams-examples
我还建议查看文档和博客文章:
- https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html
- https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/