让消费者倾听者从一开始就意识到寻求的问题



我无法理解SeekAware接口的设计。

我正在尝试为spring-kafka rest api创建的新主题实现动态侦听器。到目前为止,只使用从侦听器启动主题时开始的消费者记录,但还想修改此侦听器开始时的偏移量,以便如果想要seekToBeginning,我的侦听器无论何时调用都将从主题开始读取。

重要的代码片段如下

@Component
public class CustomKafkaListenerRegistrar implements InitializingBean {
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
private KafkaListenerContainerFactory kafkaListenerContainerFactory;
public void registerCustomKafkaListener(String name, CustomKafkaListenerProperty customKafkaListenerProperty,
boolean startImmediately) {
String listenerClass = String.join(".", CustomKafkaListenerRegistrar.class.getPackage().getName(),
customKafkaListenerProperty.getListenerClass());
CustomMessageListener customMessageListener =
(CustomMessageListener) beanFactory.getBean(Class.forName(listenerClass));
kafkaListenerEndpointRegistry.registerListenerContainer(
customMessageListener.createKafkaListenerEndpoint(name, customKafkaListenerProperty.getTopic()),
kafkaListenerContainerFactory, startImmediately);
}
public abstract class CustomMessageListener {
private static int NUMBER_OF_LISTENERS = 0;
@Autowired
private KafkaProperties kafkaProperties;
public abstract KafkaListenerEndpoint createKafkaListenerEndpoint(String name, String topic);
protected MethodKafkaListenerEndpoint<String, String> createDefaultMethodKafkaListenerEndpoint(String name,
                             String topic) {
MethodKafkaListenerEndpoint<String, String> kafkaListenerEndpoint = new MethodKafkaListenerEndpoint<>();
kafkaListenerEndpoint.setId(getConsumerId(name));
kafkaListenerEndpoint.setGroupId(kafkaProperties.getConsumer().getGroupId());
kafkaListenerEndpoint.setAutoStartup(true);
kafkaListenerEndpoint.setTopics(topic);
kafkaListenerEndpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
return kafkaListenerEndpoint;
}
private String getConsumerId(String name) {
if (isBlank(name)) {
return CustomMessageListener.class.getCanonicalName() + "#" + NUMBER_OF_LISTENERS++;
} else {
return name;
}
}
private boolean isBlank(String string) {
return Optional.ofNullable(string)
.map(String::isBlank)
.orElse(true);
}
}
@Component
public class MyCustomMessageListener extends CustomMessageListener {
@Override
@SneakyThrows
public KafkaListenerEndpoint createKafkaListenerEndpoint(String name, String topic) {
MethodKafkaListenerEndpoint<String, String> kafkaListenerEndpoint =
createDefaultMethodKafkaListenerEndpoint(name, topic);
kafkaListenerEndpoint.setBean(new MyMessageListener());
kafkaListenerEndpoint.setMethod(MyMessageListener.class.getMethod("onMessage", ConsumerRecord.class));
return kafkaListenerEndpoint;
}
@Slf4j
private static class MyMessageListener implements MessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
log.info("My message listener got a new record: " + record);
log.info("message is: "+record.toString());
CompletableFuture.runAsync(this::sleep)
.join();
log.info("My message listener done processing record: " + record);
}
@SneakyThrows
private void sleep() {
Thread.sleep(5000);
}
}
}

据我所知,MyCustomMessageListenerimplement接口ConsumerSeekAware,这反过来将提供方法seekToBeginning将在创建和触发侦听器时被调用。

请帮助。

注意:我使用的MessageListener<String, String>方法onMessage,根据我的知识相当于@KafkaListener的工作。如果我错了,请纠正我。

如果你实现MessageListener,就不需要所有KafkaListenerEndpoint的东西;只需创建一个侦听器容器,并将侦听器添加到其中。

@SpringBootApplication
public class So72546425Application {
public static void main(String[] args) {
SpringApplication.run(So72546425Application.class, args);
}
@Bean
ApplicationRunner runner1(KafkaTemplate<String, String> template) {
return args -> {
template.send("so72546425", "foo");
};
}
@Bean
ApplicationRunner runner2(ContainerCreator creator) {
return args -> {
System.out.println("Hit enter to create a container");
System.in.read();
MessageListenerContainer container = creator.createContainerForTopic("so72546425");
container.start();
};
}
}
@Component
class ContainerCreator {
private final ConcurrentKafkaListenerContainerFactory<String, String> factory;
ContainerCreator(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
this.factory = factory;
}
ConcurrentMessageListenerContainer<String,String> createContainerForTopic(String topic) {
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topic);
container.getContainerProperties().setMessageListener(new MyListener());
container.getContainerProperties().setGroupId("group.for." + topic);
return container;
}
}
class MyListener extends AbstractConsumerSeekAware implements MessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> data) {
System.out.println(data);
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
System.out.println("Seeking to beginning");
callback.seekToBeginning(assignments.keySet());
}
}

最新更新