基于时间的日志压缩在Kafka中不起作用



我们需要创建紧凑的主题,在特定大小(segment.bytes)后需要压缩,但最重要在主题级配置中经过特定时间(segment.ms)后(即使segment.bBytes尚未达到)。

现在,我们已经看到segment.bytes受到了表彰,但segment.ms没有受到表彰。我已经用Confluent kafka 5.x分发复制了这个问题

https://kafka.apache.org/documentation/#topicconfigs

这是我在apachekafka文档中读到的关于segment.ms的内容,这让我相信我们的理解是正确的-segment.ms将覆盖segment.bytes-当它出现时卡夫卡正在做一个主题的压缩。

segment.ms此配置控制即使分段文件未满,Kafka也会强制日志滚动以确保保留可以删除或压缩旧数据。

我发送的数据具有在0-20个值之间旋转的键,并且字符串"Spring Kafka Producer and Consumer Example",我将键值附加到这个字符串中。

这是生产者代码

@Override
public void run(String... strings) throws Exception {
String data = "Spring Kafka Producer and Consumer Example";
for (int j = 0; j < 30000; j++) {
for (int i = 0; i < 20; i++) {
sender.send(new Integer(i).toString(), data + i);
}
}
}

代码示例在这里https://github.com/leofoto/kafka-producer-consumer.git

我已经从中获取了代码示例(并为此测试用例对其进行了修改)https://memorynotfound.com/spring-kafka-consume-producer-example/

我首先创建了紧凑的主题,在broker日志中我看到了以下

在/tmp/kafka日志中为分区my-topic-compact-0创建日志属性{compression.type->生产者,message.format.version->2.0-IV1,file.delete.delay.ms->60000,max.message.bytes->1000012,min.compaction.lag.ms->0,message.timestamp.type->CreateTime,message.downconversion.enable->true,min.insync.replicas->1,segment.jitter.ms->0,preallocate->false,最小可清洁出生率->0.5,指数间隔字节->4096,unclan.leader.election.enable->false,retention.bytes->-1,delete.retension.ms->86400000,cleanup.policy->[delete],flush.ms->9223372036854775807,segment.ms->604800000,segment.bytes->1073741824,retention.ms->604800000,message.timestamp.difference.max.ms->9223372036854775807,segment.index.bytes->10485760,flush.messages->9223372036854775807}。(kafka.log.LogManager)〔2018-09-17 21:28:00110〕INFO[分区my-topic-compact-0 broker=0]没有检查点找到分区my-topic-compact-0的高水位线(kafka.cluster.Partition)

然后当我更改配置以使主题紧凑时

/kafka-configs--zookeeper localhost:2181--实体类型主题--entity name my topic compact--alter--add config min.cleanable.sdirty。ratio=0.01,cleanup.policy=compact,segment.ms=12000,delete。retention.ms=100,segment.bytes=200000已完成更新实体的配置:topic"my topic compact"。

Broker日志再次显示它(现在正确地报告它是一个紧凑的主题)

〔2018-09-17 22:06:25745〕信息正在处理通知/config/changes(kafka.common.ZkNodeChangeNotificationListener)〔2018-09-17 22:06:25746〕entityPath的INFO处理覆盖:topics/my topic compact with config:Map(cleanup.policy->compact,segment.ms->12000,最小可清洁度。出生率->0.01,segment.bytes->200000,delete.retension.ms->100)(kafka.server.DynamicConfigManager)

kafka-config-descripte命令也清楚地显示了

/kafka-configs--zookeeper localhost:2181--实体类型主题--实体名称我的主题紧凑——描述

主题"我的主题压缩"的配置是主题的配置"我的主题紧凑型"是segment.bytes=200000,min.cleanable.sdirty.aratio=0.01,delete.retention.ms=100,segment.ms=12000,cleanup.policy=compact

当我启动kafka服务器时,我看到以下消息

<lt;以300000毫秒的周期开始日志清理>>[[我确信300秒是代理配置值,主题级别值为12在这种情况下为秒]]

〔2018-09-17 22:01:31215〕信息〔日志分区=my-topic-non-compact-0,dir=/tmp/kafka logs]已完成日志加载,其中包含1个段,日志启动偏移量0和日志结束偏移量20在2毫秒内(kafka.log.log)〔2018-09-1722:01:31218]信息日志加载在378毫秒内完成。(kafka.log.LogManager)〔2018-09-17 22:01:31224〕信息启动日志清理周期为300000毫秒。(kafka.log.LogManager)〔2018-09-1722:01:31225]信息启动日志刷新程序,默认周期为9223372036854775807毫秒。(kafka.log.LogManager)〔2018-09-1722:01:31439]信息正在0.0.0.0:9092上等待套接字连接。(kafka.network.Acceptor)〔2018-09-17 22:01:31463〕信息〔SocketServerbrokerId=0]已启动1个接收器线程(kafka.network.SocketServer)〔2018-09-17 22:01:31478〕信息〔ExpirationReaper-0-Produce〕:启动(kafka.server.DelayedOperationPurgary$ExpiredOperationReaper)〔2018-09-17 22:01:31478〕信息〔ExpirationReaper-0-Fetch〕:正在启动(kafka.server.DelayedOperationPurgary$ExpiredOperationReaper)〔2018-09-17 22:01:31479〕信息〔ExpirationReaper-0删除记录〕:正在启动(kafka.server.DelayedOperationPurgary$ExpiredOperationReaper)〔2018-09-17 22:01:31487〕信息〔LogDirFailureHandler〕:正在启动(kafka.server.ReplicaManager$LogDirFailureHandler)〔2018-09-1722:01:31537]信息正在创建/brokers/ids/0(安全吗?false)(kafka.zk.KafkaZkClient)〔2018-09-17 22:01:31541〕信息结果在/brokers/ids/0创建znode是:OK(kafka.zk.KafkaZkClient)〔2018-09-17 22:01:31542〕信息已在path注册经纪人0/brokers/ids/0,地址为:ArrayBuffer(端点(192.168.0.119092,ListenerName(PLAINTEXT),PLAINTEXT))(kafka.zk.KafkaZkClient)

然后,当我写了很多数据时,我看到片段也在滚动,我看到了很多活动,这推动了压实的发生。[这很好]我发送了超过30万张记录,压缩发生了以及消费消息的新消费者(在进行压缩之后),它看到大约3225条记录。

〔2018-09-17 22:09:21602〕信息〔日志分区=my-topic-compact-0,dir=/tmp/kafka logs]在4ms内以偏移量185361滚动新的日志段。(kafka.log.log)〔2018-09-17 22:09:21673〕信息〔ProducerStateManagerpartition=my-topic-compact-0]正在偏移量处写入生产者快照188897(kafka.log.ProducerStateManager)〔2018-09-17 22:09:21675〕信息[Log partition=my-topic-compact-0,dir=/tmp/kafka logs]滚动的新日志3毫秒内偏移量为188897的分段。(kafka.log.log)〔2018-09-1722:09:21755]信息[ProducerStateManager partition=my-topic-compact-0]正在写入偏移192348处的生产者快照(kafka.log.ProducerStateManager)〔2018-09-17 22:09:21758〕信息〔日志partition=my-topic-compact-0,dir=/tmp/kafka logs]滚动的新日志在3毫秒内偏移192348的分段。(kafka.log.log)〔2018-09-1722:09:21831]信息[ProducerStateManager partition=my-topic-compact-0]正在写入偏移量为195846的生产者快照(kafka.log.ProducerStateManager)〔2018-09-17 22:09:21834〕信息〔日志partition=my-topic-compact-0,dir=/tmp/kafka logs]滚动的新日志3毫秒内偏移量为195846的分段。(kafka.log.log)〔2018-09-1722:09:21879]信息[ProducerStateManager partition=my-topic-compact-0]正在以偏移量199461写入生产者快照(kafka.log.ProducerStateManager)〔2018-09-17 22:09:21882〕信息〔日志partition=my-topic-compact-0,dir=/tmp/kafka logs]滚动的新日志分段偏移量199461在3毫秒内。(kafka.log.log)〔2018-09-1722:09:21909]信息[ProducerStateManager partition=my-topic-compact-0]正在写入偏移量203134处的生产者快照(kafka.log.ProducerStateManager)〔2018-09-17 22:09:21915〕信息〔日志partition=my-topic-compact-0,dir=/tmp/kafka logs]滚动的新日志在7毫秒内偏移203134的分段。(kafka.log.log)〔2018-09-1722:09:21980]信息[ProducerStateManager partition=my-topic-compact-0]正在写入偏移量206703处的生产者快照(kafka.log.ProducerStateManager)〔2018-09-17 22:09:21985〕信息〔日志partition=my-topic-compact-0,dir=/tmp/kafka logs]滚动的新日志在6毫秒内偏移206703的分段。(kafka.log.log)

现在,无论等待多长时间(超过12秒),日志压缩都不会启动

无论我在运行以下命令之前等待了多少(每次都有新的消费者组)

/kafka控制台使用者--引导服务器localhost:9092--主题我的主题压缩--从一开始--属性print.key=true--组新组16

每个新消费者消耗的消息正好是3225条,如果压缩发生在主题级segment.ms已经通过之后,它应该把它压缩到只有20个键和它们的最新值。但我们没有看到这种行为。我错过了什么吗。

删除不工作

最重要的是,当我发送相同密钥的空有效载荷时,比如这个

@Override
public void run(String... strings) throws Exception {
String data = "Spring Kafka Producer and Consumer Example";
for (int j = 0; j < 2; j++) {
for (int i = 0; i < 20; i++) {
sender.send(new Integer(i).toString(), null);
}
}
}

我们预计这些消息最终会在下一个压缩循环中被删除。在segment.ms时间过去后(在主题级别配置的情况下为12秒)也不会发生这种情况

/kafka-configs--zookeeper localhost:2181--实体类型主题--实体名称我的主题紧凑——描述

主题"我的主题压缩"的配置是主题的配置"我的主题紧凑型"是segment.bytes=200000,min.cleanable.sdirty.aratio=0.01,delete.retention.ms=100,segment.ms=12000,cleanup.policy=compact

基于时间的日志压缩在kafka(apache kafka 2.x或confluent distribution 5.x)中还不支持

就目前而言,这对我们来说是行不通的。分享以下信息供他人参考

一次https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+添加+a+最大值+对数+压实+滞后已完成并实施,我鼓励您重新考虑使用案例

最新更新