我如何在内存或嵌入式kafka中实现而不是为了测试目的



我希望在一个只允许Kafka由上述应用程序运行的环境中部署一个spring-boot应用程序。我的应用程序将成为卡夫卡的生产者和消费者。有没有一种方法可以在启动时运行内存中的实例,用于测试之外的其他目的?或者,有没有一种方法可以启动一个春季启动应用程序,如果它不能作为生产者和消费者连接到卡夫卡,它就不会失败?

edit:在我们能够在此环境中部署Kafka之前,这是一个临时解决方案。该应用程序不生成和使用自己的记录。它是多应用程序部署的一部分,每个应用程序都为其他应用程序生成并使用其他应用程序的Kafka主题。当消费者无法使用Kafka时,我看到了很多关于应用程序启动的信息,但关于生产者的信息并不多。我的应用程序将同时做这两件事。

这样一个应用程序的目的是什么(生成和使用自己的记录(?嵌入式代理不是为生产使用而设计的。

从2.3.4版本开始,容器属性missingTopicsFatal默认为false,这将允许容器在代理不可用的情况下启动。对于早期版本,您可以将其设置为false以获得相同的效果。

如果为true,容器将在启动期间连接到代理,以验证主题是否存在。

您还可以设置容器的autoStartup=false,以防止容器启动。

编辑

我不建议在生产中使用它,但您可以从spring-kafka-test中删除test作用域,并将代理声明为@Bean。。。

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<!--            <scope>test</scope> -->
</dependency>
@Bean
EmbeddedKafkaBroker broker() {
return new EmbeddedKafkaBroker(1)
.kafkaPorts(9092)
.brokerListProperty("spring.kafka.bootstrap-servers"); // override application property
}

我刚刚用这个应用程序测试了它。。。

@SpringBootApplication
public class So63812994Application {
public static void main(String[] args) {
SpringApplication.run(So63812994Application.class, args);
}
@Bean
EmbeddedKafkaBroker broker() {
return new EmbeddedKafkaBroker(1)
.kafkaPorts(9092)
.brokerListProperty("spring.kafka.bootstrap-servers");
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so63812994").partitions(1).replicas(1).build();
}
@KafkaListener(id = "so63812994", topics = "so63812994")
public void listen(String in) {
System.out.println(in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so63812994", "foo");
};
}
}
spring.kafka.bootstrap-servers=realKafka:9092
spring.kafka.consumer.auto-offset-reset=earliest

EDIT2

通过以上配置,同一主机上的其他应用程序可以连接到localhost:9092

如果你需要远程访问这个嵌入式代理,你需要一些额外的配置:

@Bean
EmbeddedKafkaBroker broker() {
return new EmbeddedKafkaBroker(1)
.kafkaPorts(9092)
.brokerProperty("listeners", "PLAINTEXT://localhost:9092,REMOTE://10.0.0.20:9093")
.brokerProperty("advertised.listeners", "PLAINTEXT://localhost:9092,REMOTE://10.0.0.20:9093")
.brokerProperty("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,REMOTE:PLAINTEXT")
.brokerListProperty("spring.kafka.bootstrap-servers");
}

然后,您可以使用10.0.0.20:9093从其他服务器进行连接。

最新更新