Kafka API 和/或协议是否提供了一种查询服务器属性的方法?



我正在编写一个 Kafka 生产者,它有时会发送一个请求,其中包含一批超过最大允许请求大小的消息。 似乎我无法直接访问要向其发送消息的 Kafka 集群的服务器属性,并且我还没有找到向服务器查询server.properties文件中设置的值的方法。

尝试发送太大的消息将触发 Kafka 日志,指出...

11:47:37 kafka.1     | Topic and partition to exceptions: 
page-visits-0 -> org.apache.kafka.common.errors.RecordTooLargeException 
(kafka.server.KafkaApis)

您可以使用 KafkaAdminClient API 获取集群信息。它可以提供经纪人级别以及主题级别信息。 下面的代码将为每个节点提供服务器配置。

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfigResource.Type;

public class ListTopics {
public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
Properties prop = new Properties();
prop.setProperty("bootstrap.servers","localhost:9092");
AdminClient admin = AdminClient.create(prop);
DescribeClusterResult describeClusterResult = admin.describeCluster();
List<Node> nodes = new ArrayList<>(describeClusterResult.nodes().get());
// Pass the broker node ID here. You can use for loop in case of multiple broker nodes.
ConfigResource resource = new ConfigResource(Type.BROKER, String.valueOf(nodes.get(0).id()));
DescribeConfigsResult configs = admin.describeConfigs(Collections.singletonList(resource));
Map<ConfigResource, Config> config = configs.all().get();
System.out.println(config   );
}
}

附言此 API 只能用于 Kafka 0.11 及更高版本的安装。

假设您的集群至少运行 Kafka 0.11,您可以使用 AdminClientdescribeConfigs()API 来检索代理配置。

例如:

Properties configs = new Properties();
configs.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient client = AdminClient.create(configs);
List<ConfigResource> resources = Arrays.asList(new ConfigResource(Type.BROKER, "0"));
DescribeConfigsResult dcr = client.describeConfigs(resources);
for (Map.Entry<ConfigResource, Config> entry : dcr.all().get().entrySet()) {
System.out.println(entry.getKey() + " - " + entry.getValue());
}

相关内容