因此,由于某种原因,我无法通过Kafka Java API与MSK建立正确的连接。我可以让生产者/消费者使用导体和Kafka CLI工具与MSK一起工作。然而,当我试图连接我的Scala代码时,我无法让它工作。因此,我使用如下配置通过conduktor
和Kafka CLI工具进行连接。
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandle
对于我的Scala应用程序,我正在使用类似的模式
设置生产者/消费者def props: Properties = {
val p = new Properties()
....
p.setProperty("security.protocol", "SASL_SSL")
p.setProperty("sasl.mechanism", "AWS_MSK_IAM")
p.setProperty("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;")
p.setProperty("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler")
p
}
val PRODUCER = new KafkaConsumer[AnyRef, AnyRef](props)
因此,当我省略安全配置行并针对Kafka的本地实例运行时,代码工作,但当我试图击中MSK时,它似乎没有构建消费者,我得到以下错误。
java.lang.IllegalStateException: You can only check the position for partitions assigned to this consumer.
但是,本地运行的实例可以工作。所以这让我认为我没有在配置中正确设置连接到MSK的东西。
我正在尝试遵循以下教程,我使用Scala 2.11和Kafka版本2.41。我还添加了aws-msk-iam-auth
到我的构建。sbt (1.1.0
)。有什么想法或解决方案吗?
这不是我的AWS连接的问题,因为我实现了一些日志记录。我的问题在于Kafka和MSK的本地运行版本之间的差异。我仍在努力理解它们的区别。