我有 Spring 启动应用程序,它是接收数据而不是静止,基于一些业务逻辑,我需要将数据丢弃到两个不同的 kafka 集群,它们有自己的 kerberos 密钥 menttioned jaas 文件。
我已经编写了两个不同的生产者实例,在它们不同的对象实例中具有以下属性。
@Service
public class EventProducer {
private Logger logger = LoggerFactory.getLogger(EventProducer.class);
Producer<String, String> kafkaProducer = null;
@Autowired
public Producer<String, String> createProducer() {
if (kafkaProducer == null) {
Properties props = getKafkaConfig();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Cluste_1_hostaddress:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG,"usertest");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, "3");
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1600);
System.setProperty("javax.security.auth.useSubjectCredsOnly", "true");
System.setProperty("java.security.auth.login.config", "/home/user/clusrter_1_jaas.conf);
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("kafka.cluster.SecurityProtocol",PLAINTEXTSASL);
props.put("sasl.kerberos.service.name", "kafka");
props.put("sasl.kerberos", "sasl.kerberos.service.namekafka");
props.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism.inter.broker.protocol", "PLAIN");
props.put("sasl.enabled.mechanisms", "PLAIN");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
kafkaProducer = new KafkaProducer<String, String>(props);
}
return kafkaProducer;
}
}
第二制片人
@Service
public class MovementProducer {
private Logger logger = LoggerFactory.getLogger(MovementProducer.class);
Producer<String, String> kafkaProducer = null;
@Autowired
public Producer<String, String> createProducer() {
if (kafkaProducer == null) {
Properties props = getKafkaConfig();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Cluste_2_hostaddress:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG,"usertest");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, "3");
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1600);
System.setProperty("javax.security.auth.useSubjectCredsOnly", "true");
System.setProperty("java.security.auth.login.config", "/home/user/clusrter_2_jaas.conf);
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("kafka.cluster.SecurityProtocol",PLAINTEXTSASL);
props.put("sasl.kerberos.service.name", "kafka");
props.put("sasl.kerberos", "sasl.kerberos.service.namekafka");
props.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism.inter.broker.protocol", "PLAIN");
props.put("sasl.enabled.mechanisms", "PLAIN");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
kafkaProducer = new KafkaProducer<String, String>(props);
}
return kafkaProducer;
}
}
当我将其作为仅启用生产者实例的两个服务启动时,它可以工作,但是当我在单个 jar 中启用两个实例时,只有一个生产者工作,而其他生产者会出现身份验证问题。
我觉得这是由于System.setProperty("java.security.auth.login.config","(,因为它是全局系统变量,所以当我在单个进程中同时使用两者时它会覆盖,所以只有一个有效。
那么除了启动两个过程之外,还有什么方法可以解决这个问题。我只有一个弹簧服务,应该能够生产到两个不同的卡夫卡集群..
Kafka 客户端的最新版本为不同的客户端提供了多个 JAAS confs 的选项。
例如,如果您的实例想要连接具有不同 JAAS conf 的两个集群,我们可以在不同的生产者和使用者级别上覆盖。只需创建 2 个独立的生产者工厂并设置sasl.jaas.conig
clustera.java.security.auth.login.config=com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="test.keytab"
principal="test@domain.com";
clusterb.java.security.auth.login.config=com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="testb.keytab"
principal="testb@domain.com"