我正在使用Spring kafka开发一个Spring启动应用程序,该应用程序侦听kafka的单个主题,然后分离各个类别的记录,从中创建一个json文件并将其上传到AWS S3。
我在 Kafka 主题中收到了巨大的数据量,我需要确保 json 文件被适当地分块,以限制上传到 S3 的 json 数量。
以下是我对 kafka 消费者的application.yml
配置。
spring:
kafka:
consumer:
group-id: newton
auto-offset-reset: earliest
fetch-max-wait:
seconds: 1
fetch-min-size: 500000000
max-poll-records: 50000000
value-deserializer: com.forwarding.application.consumer.model.deserializer.MeasureDeserializer
我创建了一个用于连续阅读该主题的听众。
即使使用上述配置,我也会在控制台中接收记录,如下所示:
2019-03-27T15:25:56.02+0530 [APP/PROC/WEB/0] OUT 2019-03-27 09:55:56.024 INFO 8 --- [ntainer#0-0-C-1] c.s.n.f.a.s.impl.ConsumerServiceImpl : Time taken(ms) 56. No Of measures: 60
2019-03-27T15:25:56.21+0530 [APP/PROC/WEB/2] OUT 2019-03-27 09:55:56.210 INFO 8 --- [ntainer#0-0-C-1] c.s.n.f.a.s.impl.ConsumerServiceImpl : Time taken(ms) 80. No Of measures: 96
2019-03-27T15:25:56.56+0530 [APP/PROC/WEB/0] OUT 2019-03-27 09:55:56.560 INFO 8 --- [ntainer#0-0-C-1] c.s.n.f.a.s.impl.ConsumerServiceImpl : Time taken(ms) 76. No Of measures: 39
2019-03-27T15:25:56.73+0530 [APP/PROC/WEB/2] OUT 2019-03-27 09:55:56.732 INFO 8 --- [ntainer#0-0-C-1] c.s.n.f.a.s.impl.ConsumerServiceImpl : Time taken(ms) 77. No Of measures: 66
任何人都可以让我知道可以根据application.yml
中的配置配置什么来获取收到的记录吗?
我刚刚复制了您的配置(除了最大等待时间 - 请参阅我使用的语法),它工作正常......
spring:
kafka:
consumer:
group-id: newton
auto-offset-reset: earliest
fetch-max-wait: 1s
fetch-min-size: 500000000
max-poll-records: 50000000
2019-03-27 13:43:55.454 INFO 98982 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 1000
fetch.min.bytes = 500000000
group.id = newton
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 50000000
...
使用...properties
属性设置不直接支持的任意属性作为引导属性。
例如
spring:
kafka:
consumer:
properties:
max.poll.interval.ms: 300000
或
spring:
kafka:
consumer:
properties:
max:
poll:
interval:
ms: 300000
文档在这里。
自动配置支持的属性显示在附录 A, 通用应用程序属性中。请注意,在大多数情况下,这些属性(连字符或驼峰大小写)直接映射到 Apache Kafka 虚线属性。有关详细信息,请参阅 Apache Kafka 文档。
这些属性中的前几个适用于所有组件(生成者、使用者、管理员和流),但如果希望使用不同的值,可以在组件级别指定。Apache Kafka 指定具有高、中或低重要性的属性。Spring 引导自动配置支持所有高重要性属性、一些选定的中低属性以及任何没有默认值的属性。
只有 Kafka 支持的属性子集可以直接通过 KafkaProperties 类获得。如果要使用不直接支持的其他属性配置创建者或使用者,请使用以下属性:
spring.kafka.properties.prop.one=first
spring.kafka.admin.properties.prop.two=second
spring.kafka.consumer.properties.prop.three=third
spring.kafka.producer.properties.prop.four=fourth
spring.kafka.streams.properties.prop.five=fifth