我有多个spring-boot应用程序实例从一个kafka主题消费。由于我希望所有实例都能从本主题的所有分区获得数据,因此我为每个实例分配了不同的使用者组,这些使用者组将在启动此应用程序时动态创建。
@Configuration
@EnableKafka
public class KafkaStreamConfig {
@Bean("provisioningStreamsBuilderFactoryBean")
public StreamsBuilderFactoryBean myStreamsBuilderFactoryBean() {
String myCGName = "MY-CG-" + UUID.randomUUID().toString();
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(APPLICATION_ID_CONFIG, myCGName); // setting consumer group name
// setting other props
StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean();
streamsBuilderFactoryBean.setStreamsConfiguration(streamsConfiguration);
return streamsBuilderFactoryBean;
}
}
因此,每次实例重新启动或创建新实例时,都会创建一个新的使用者组。这是从我的主题中读到的消费者。
@Component
public class MyConsumer {
@Autowired
private StreamsBuilder streamsBuilder;
@PostConstruct
public void consume() {
final KStream<String, GenericRecord> events = streamsBuilder.stream("my-topic");
events
.selectKey((key, record) -> record.get("id").toString())
.foreach((id, record) -> {
// some computations with the events consumed
});
}
}
现在,由于这些动态创建的使用者组保持不变,并且一旦实例重新启动,它们就不会在我的应用程序中使用,因此它们不再使用消息,并显示出很大的滞后性,因此会产生错误警报。
因此,当应用程序使用Kafka的AdminClientapi关闭时,我想删除这些使用者组。我想在关机挂钩中删除它,就像在MyConsumer类中用@PreDestroy
注释的方法中一样:
@PreDestroy
public void destroyMYCG() {
try (AdminClient admin = KafkaAdminClient.create(properties)) {
DeleteConsumerGroupsResult deleteConsumerGroupsResult = admin.deleteConsumerGroups(Collections.singletonList(provGroupName));
KafkaFuture<Void> future = deleteConsumerGroupsResult.all();
future.whenComplete((aVoid, throwable) -> {
System.out.println("EXCEPTION :: " + ExceptionUtils.getStackTrace(throwable));
});
}
System.out.println(getClass().getCanonicalName() + " :: DESTROYING :: " + provGroupName);
}
但如果我尝试过,消费者群体仍然出现在消费者群体列表中,我就会得到这个例外:
org.apache.kafka.commun.errors.TimeoutException:AdminClient线程不接受新调用。
有人能帮我吗?
使用UUID作为使用者组名称非常糟糕。您可以为每个spring-boot应用程序定义一个最终的str作为消费者goup名称。
- IMHO这是用UUID创建使用者组的逻辑错误。从逻辑上讲,如果同一个进程重新启动,它就是同一个应用程序——同一个消费者。你将解决你的问题,为好的消费者群体提供与应用程序逻辑操作相关的名称
- 我将删除服务器端的消费者组;GC";设置为一定程度的滞后
同样,使用者组不是应用程序id。它不是随机创建的。老实说,我不确定你这样做能解决什么样的问题。因为事实上,通过说消费者群体是随机的,你说我的代码在做随机的事情,而我不知道在消息处理中会发生什么。我们有非常复杂的Kafka消息处理,总是有更好或更坏的进程名称,但至少存在一个,它不是随机的。
FYI,deleteConsumerGroups
方法将异步删除consumerGroups,在此之前,您的应用程序已关闭,adminClient也已断开连接。这就是为什么它抛出AdminClient的线程不接受新调用的原因。
为了避免这个问题,您可以尝试同步调用来删除consumerGroups。
DeleteConsumerGroupsResult deleteConsumerGroupsResult = kafkaAdminClient.deleteConsumerGroups(kafkaConsumerGroups);
try {
deleteConsumerGroupsResult.all().get();
} catch (Exception e) {
log.error("Received exception while deleting consumerGroups: {}, {}", e.getMessage(),
ExceptionUtils.getStackTrace(e));
}