我在Spring Boot中配置了几个Kafka消费者。这就是kafka.properties的样子(这里只列出一个消费者的配置):
kafka.topics=
bootstrap.servers=
group.id=
enable.auto.commit=
auto.commit.interval.ms=
session.timeout.ms=
schema.registry.url=
auto.offset.reset=
kafka.enabled=
这是配置:
@Configuration
@PropertySource({"classpath:kafka.properties"})
public class KafkaConsumerConfig {
@Autowired
private Environment env;
@Bean
public ConsumerFactory<String, String> pindropConsumerFactory() {
Map<String, Object> dataRiverProps = new HashMap<>();
dataRiverProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers"));
dataRiverProps.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("group.id"));
dataRiverProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, env.getProperty("enable.auto.commit"));
dataRiverProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, env.getProperty("auto.commit.interval.ms"));
dataRiverProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, env.getProperty("session.timeout.ms"));
dataRiverProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
dataRiverProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
dataRiverProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("auto.offset.reset"));
return new DefaultKafkaConsumerFactory<>(dataRiverProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(pindropConsumerFactory());
return factory;
}
}
这是消费者:
@Component
public class KafkaConsumer {
@Autowired
private MessageProcessor messageProcessor;
@KafkaListener(topics = "#{'${kafka.topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
public void consumeJson(String message) {
// processing message
}
}
有没有办法让我使用道具"kafka.enabled",以便我可以控制这个消费者的创建或消息检索?非常感谢!
在消费者中使用属性autoStartup(true/false)来做到这一点,如下所示 -
@KafkaListener(id = "foo", topics = "Topic1", groupId = "group_id",
containerFactory = "kafkaListenerContainerFactory",autoStartup = "${listen.auto.start:false}")
public void consume(String message) {
//System.out.println("Consumed message: " + message);
}
要禁用 Kafka 配置,您可以,例如:
-
Annotate KafkaConsumerConfig with
@ConditionalOnProperty(value = "kafka.enabled", matchIfMissing = true)
-
删除类
KafkaConsumer
上的@Component
,并将其定义为KafkaConsumerConfig
中的@Bean。
要控制 KafkaConsumer 中的消息检索,请执行以下操作:
-
只需在Kafka消费者
@Value("kafka.enabled") private Boolean enabled;
中获取属性值 -
然后在用
@KafkaListener
注释的方法中使用简单的 if .