通过使用Spring Kafka中另一个消费者的值触发一个Kafka消费者



我有一个调度程序,它产生一个事件。我的消费者消费这个事件。此事件的有效负载是一个json,包含以下字段:

private String topic;
private String partition;
private String filterKey;
private long CustId;  

现在我需要触发另一个消费者,它将获取所有这些信息,我从第一个消费者那里得到响应。

@KafkaListener(topics = "<**topic-name-from-first-consumer-response**>", groupId = "group" containerFactory = "kafkaListenerFactory")
public void consumeJson(List<User> data, Acknowledgment acknowledgment,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
// consumer code goes here...}

我需要创建一些动态变量,我可以传递它来代替主题名。

同样,我在配置文件中使用过滤,我需要在配置中动态传递密钥。

factory.setRecordFilterStrategy(new RecordFilterStrategy<String, Object>() {
@Override
public boolean filter(ConsumerRecord<String, Object> consumerRecord) {
if(consumerRecord.key().equals("**Key will go here**")) {
return false;
}
else {
return true;
}
}
});

我们如何从第一个消费者的响应中动态注入这些值并触发第二个消费者呢?两个消费者都在同一个应用程序

不能使用带注释的侦听器,该配置仅在初始化期间使用;您需要自己创建侦听器容器(使用ConcurrentKafkaListenerContainerFactory)来动态创建侦听器。

编辑

下面是一个例子。

@SpringBootApplication
public class So69134055Application {
public static void main(String[] args) {
SpringApplication.run(So69134055Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so69134055").partitions(1).replicas(1).build();
}
}
@Component
class Listener {
private static final Logger log = LoggerFactory.getLogger(Listener.class);
private static final Method otherListen;
static {
try {
otherListen = Listener.class.getDeclaredMethod("otherListen", List.class);
}
catch (NoSuchMethodException | SecurityException ex) {
throw new IllegalStateException(ex);
}
}
private final ConcurrentKafkaListenerContainerFactory<String, String> factory;
private final MessageHandlerMethodFactory methodFactory;
private final KafkaAdmin admin;
private final KafkaTemplate<String, String> template;
public Listener(ConcurrentKafkaListenerContainerFactory<String, String> factory, KafkaAdmin admin,
KafkaTemplate<String, String> template, KafkaListenerAnnotationBeanPostProcessor<?, ?> bpp) {
this.factory = factory;
this.admin = admin;
this.template = template;
this.methodFactory = bpp.getMessageHandlerMethodFactory();
}
@KafkaListener(id = "so69134055", topics = "so69134055")
public void listen(String topicName) {
try (AdminClient client = AdminClient.create(this.admin.getConfigurationProperties())) {
NewTopic topic = TopicBuilder.name(topicName).build();
client.createTopics(List.of(topic)).all().get(10, TimeUnit.SECONDS);
}
catch (Exception e) {
log.error("Failed to create topic", e);
}
ConcurrentMessageListenerContainer<String, String> container =
this.factory.createContainer(new TopicPartitionOffset(topicName, 0));
BatchMessagingMessageListenerAdapter<String, String> adapter =
new BatchMessagingMessageListenerAdapter<>(this, otherListen);
adapter.setHandlerMethod(new HandlerAdapter(
this.methodFactory.createInvocableHandlerMethod(this, otherListen)));
FilteringBatchMessageListenerAdapter<String, String> filtered =
new FilteringBatchMessageListenerAdapter<>(adapter, record -> !record.key().equals("foo"));
container.getContainerProperties().setMessageListener(filtered);
container.getContainerProperties().setGroupId("group.for." + topicName);
container.setBeanName(topicName + ".container");
container.start();
IntStream.range(0, 10).forEach(i -> this.template.send(topicName, 0, i % 2 == 0 ? "foo" : "bar", "test" + i));
}
void otherListen(List<String> others) {
log.info("Others: {}", others);
}
}
spring.kafka.consumer.auto-offset-reset=earliest

输出-显示过滤器应用于键中包含bar的记录。

Others: [test0, test2, test4, test6, test8]

最新更新