从跨多个主题的相同分区检索数据



在我们当前的系统中,我们有4个主题

人类(20),话题二(20),话题三(20),Topic4 (20)

对于生产者端,我们已经应用了自定义IDpartitioner,它基本上执行ID%20 (20=numpartition)

public Integer partition(String topic, Object key, Object value, int numPartition) {
try {
log.info("topic:{} key:{} numPartition:{}", topic, key.toString(), numPartition);
int currPartition = Integer.valueOf((String) key) % numPartition;
if(currPartition < 0) {
log.info("Partition less than 0 for topic:{} key:{} currPartition:{} ", topic, key.toString(), currPartition);
currPartition = 1;
}
return currPartition;
} catch(Exception e) {
return 1;
}
}

对于消费者端我们有2个消费者在我们的消费者组云消费者在ECS集群。使用以下配置。每个消费者都在听所有4个主题。

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
return new DefaultKafkaConsumerFactory<>(props);
}

现在的问题是我们没有收到来自不同主题的同一消费者的同一分区的所有消息。

尝试检查分区分配,下面是结果

ConsumerRebalanceListener myConsumerRebalanceListener(){onPartitionsAssigned()}

消费者1>话题1-(0-9),话题2-(0-9),话题3-(10-19),话题4->(10-19)
消费者2>话题1-(10-19),话题2-(10-19),话题3->(0-9),题目4- (0-9)

现在可以看到,对于分区5我们从topic1和topic3收到了关于消费者的数据

正常工作。在同一个消费者组中没有重叠的分区被使用。用户1和用户2不能同时读取分区5;

分区分配在多个主题之间是不一致的,除非您手动将它们分配给消费者。消费者不知道你的生产者的分区方案,理想情况下也不应该知道。

您将在每个分区内按消息排序,但是,每个部署的消费者应该只关心这个,而不是从特定分区读取特定的键;这将帮助您更好地扩展和处理消费者组重新平衡

正如评论中提到的,也许你可以使用Kafka Streams来代替,并执行groupByKey操作来在你的消费者实例中打乱数据

最新更新