春天卡夫卡如何配置两个不同 Kerberos 的 Jaa



我有 Spring 启动应用程序,它是接收数据而不是静止,基于一些业务逻辑,我需要将数据丢弃到两个不同的 kafka 集群,它们有自己的 kerberos 密钥 menttioned jaas 文件。

我已经编写了两个不同的生产者实例,在它们不同的对象实例中具有以下属性。

@Service
public class EventProducer {
private Logger logger = LoggerFactory.getLogger(EventProducer.class);


Producer<String, String> kafkaProducer = null;
@Autowired
public Producer<String, String> createProducer() {
if (kafkaProducer == null) {
Properties props = getKafkaConfig();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Cluste_1_hostaddress:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG,"usertest");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, "3");
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1600);
System.setProperty("javax.security.auth.useSubjectCredsOnly", "true"); 
System.setProperty("java.security.auth.login.config", "/home/user/clusrter_1_jaas.conf);   

props.put("security.protocol", "SASL_PLAINTEXT");
props.put("kafka.cluster.SecurityProtocol",PLAINTEXTSASL);
props.put("sasl.kerberos.service.name",  "kafka"); 
props.put("sasl.kerberos", "sasl.kerberos.service.namekafka");     
props.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism.inter.broker.protocol", "PLAIN");
props.put("sasl.enabled.mechanisms", "PLAIN");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

kafkaProducer = new KafkaProducer<String, String>(props);
}
return kafkaProducer;
}

}

第二制片人

@Service
public class MovementProducer {
private Logger logger = LoggerFactory.getLogger(MovementProducer.class);

Producer<String, String> kafkaProducer = null;
@Autowired
public Producer<String, String> createProducer() {
if (kafkaProducer == null) {
Properties props = getKafkaConfig();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Cluste_2_hostaddress:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG,"usertest");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, "3");
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1600);
System.setProperty("javax.security.auth.useSubjectCredsOnly", "true"); 
System.setProperty("java.security.auth.login.config", "/home/user/clusrter_2_jaas.conf);   

props.put("security.protocol", "SASL_PLAINTEXT");
props.put("kafka.cluster.SecurityProtocol",PLAINTEXTSASL);
props.put("sasl.kerberos.service.name",  "kafka"); 
props.put("sasl.kerberos", "sasl.kerberos.service.namekafka");     
props.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism.inter.broker.protocol", "PLAIN");
props.put("sasl.enabled.mechanisms", "PLAIN");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

kafkaProducer = new KafkaProducer<String, String>(props);
}
return kafkaProducer;
}

}

当我将其作为仅启用生产者实例的两个服务启动时,它可以工作,但是当我在单个 jar 中启用两个实例时,只有一个生产者工作,而其他生产者会出现身份验证问题。

我觉得这是由于System.setProperty("java.security.auth.login.config","(,因为它是全局系统变量,所以当我在单个进程中同时使用两者时它会覆盖,所以只有一个有效。

那么除了启动两个过程之外,还有什么方法可以解决这个问题。我只有一个弹簧服务,应该能够生产到两个不同的卡夫卡集群..

Kafka 客户端的最新版本为不同的客户端提供了多个 JAAS confs 的选项。

例如,如果您的实例想要连接具有不同 JAAS conf 的两个集群,我们可以在不同的生产者和使用者级别上覆盖。只需创建 2 个独立的生产者工厂并设置sasl.jaas.conig

clustera.java.security.auth.login.config=com.sun.security.auth.module.Krb5LoginModule required 
useKeyTab=true 
storeKey=true 
keyTab="test.keytab" 
principal="test@domain.com";
clusterb.java.security.auth.login.config=com.sun.security.auth.module.Krb5LoginModule required 
useKeyTab=true 
storeKey=true 
keyTab="testb.keytab" 
principal="testb@domain.com"

相关内容

  • 没有找到相关文章

最新更新