我对Kafka很陌生,我正在做一个项目来学习和理解Kafka。我在我的笔记本电脑上运行Kafka,所以我有一个消费者和一个生产者,我正在使用Java (Spring Boot)来收听这些流并消费消息。
假设我创建了两个不同的组,命名为"automatic"one_answers";manual"。对于"自动"第一,我不希望消息立即执行任务。我想在1分钟内聚合消息,当1分钟过去时,我希望它触发一些自定义事件。
但是对于"手册"第一,我希望消息使用它并立即触发事件。
但是当我从生产者发送消息时它会转到这个公共主题本身并且消息中有一个属性表示如果它是一个手册或";automatic"类型。
这是我的application.properties
文件中的Kafka主题声明。
spring.cloud.stream.kafka.bindings.automatic.consumer.configuration.client.id=automatic-consumption-event
spring.cloud.stream.bindings.automatic.destination=main.event
spring.cloud.stream.bindings.automatic.binder=test-stream-app
spring.cloud.stream.bindings.automatic.group=consumer-automatic-group
spring.cloud.stream.bindings.automatic.consumer.concurrency=1
spring.cloud.stream.kafka.bindings.manual.consumer.configuration.client.id=manual-consumption-event
spring.cloud.stream.bindings.manual.destination=main.event
spring.cloud.stream.bindings.manual.binder=test-app
spring.cloud.stream.bindings.manual.group=consumer-manual-group
spring.cloud.stream.bindings.manual.consumer.concurrency=1
我已经创建了2个独立的方法来使用和执行不同的操作,像这样
private windows;
@PostConstruct()
private void init() {
this.windows = SessionWindows.with(Duration.ofSeconds(5)).grace(Duration.Zero);
}
public void automatic(Ktream<string, CustomObjectType> eventStream) {
eventStream.filter((x, y) -> y != null && !y.isManual(), Named.as("automatic_event"))
.groupByKey(Grouped.with("participant_id", Serdes.String(), Serdes.Long()))
.windowedBy(windows)
.reduce(Long::sum, Named.as("participant_id_sum"))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream(Named.as("participant_id_stream"))
.foreach(this::fireCustomEvent);
}
@StreamListener("manual-event")
public void manual(@Payload String payload) {
var parsedObject = this.parseJSON(payload);
if(!payload.isManual()) {
return;
}
this.fireCustomEvent();
}
private CustomObjectType parseJSON(String json) {
return JSONObject.parseObject(json.substring(json.indexOf("{")), CustomObjectType.class);
}
private void fireCustomEvent(){
// Should do something.
}
我在我的系统上运行了这个命令。
bin/kafka-console-producer.sh --topic main.event --property "parse.key=true" --property "key.separator=:" --bootstrap-server localhost:62341
然后我用下面的命令运行consumer:
bin/kafka-consumer.sh --topic main.event --from-beginning --bootstrap-server localhost:62341
这些是我传递给制作人的事件。
123: {"eventHeader": null, "data": {"headline": "You are winner", "id": "42", "isManual": true}}
987: {"eventHeader": null, "data": {"headline": "You will win", "id": "43", "isManual": false}}
当事件被生产者传递时,我可以看到我的manual()
触发消息。但它正在做预期的事情,接收信息并立即触发事件。但是,它消耗的是两种类型的消息,问题是"自动"消息不再聚合。因为它们已经被消费者拿走了。
每次我重新启动我的spring引导应用程序时,automatic()
方法触发,但它没有找到任何要过滤的消息,因为它们已经被消耗了,根据我的理解。有人能帮我弄清楚我在哪里引起了混乱吗?
我不太明白你的问题。Spring将"自动"启动这两个功能。但是在automatic()
参数
Ktream
的错字同时使用消息类型
对吧…因为两者都存在于同一个主题中。也许你想在Kafka流中使用分支/分割操作符来为所有手动事件创建一个单独的主题,这是你的"手册"。方法读取?
因为它们已经被消耗了
没关系。重要的是偏移量被提交了。只要数据保留在主题中,您可以随意多次重复使用主题。
强制重复使用,可以使用
KafkaConsumer.seek
- 停止应用程序后的
kafka-consumer-groups --reset-offsets
- 给应用程序一个新的
application.id
/group.id
以及消费者配置auto.offset.reset=earliest