随机分区程序不会在 Kafka 主题分区之间分发消息



我在 Kafka 中创建了一个包含 9 个分区的主题,将其恰当地命名为"test",并使用客户端库在 C# (.NET Core) 中将两个简单的应用程序组合在一起Confluent.Kafka客户端库:生产者和消费者。我只做了调整文档中的示例。

我正在运行消费者应用程序的两个实例和一个生产者实例。我认为在这里粘贴消费者代码没有多大意义,这是一个微不足道的"获取消息,在屏幕上打印"应用程序,但是,它也确实打印了消息来自的分区号。

这是制作者应用程序:

static async Task Main(string[] args)
{
var random = new Random();
var config = new ProducerConfig {
BootstrapServers = "10.0.0.5:9092",
Partitioner = Partitioner.ConsistentRandom
};
int counter = 0;
while (true)
{
using (var p = new ProducerBuilder<string, string>(config).Build())
{
try
{
p.BeginProduce(
"test",
new Message<string, string>
{
//Key = random.Next().ToString(),
Value = $"test {++counter}"
});
if (counter % 10 == 0)
p.Flush();
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
}
}
}

问题:如果未设置消息的Key属性,则所有消息都将发送到分区号 7,这意味着我的一个使用者实例处于空闲状态。我不得不手动随机化密钥才能在分区之间分配它们(请参阅注释掉的行)。(从文档中复制的原始代码使用Null作为密钥的类型,这也将所有消息发送到第 7 个分区。

为什么?根据ProducerConfig.Partitioner属性的文档,如果未指定键,consistent_random选项应确保随机分布。我尝试使用Partioner.Random选项,无论键如何,它都应该使用随机分布,但这没有帮助。

这是预期的行为,是我做错了什么,还是遇到了错误?

我使用的是Confluent.Kafka NuGet的1.0.0-RC2版本。

分区程序配置的完整文档:

// Summary:
//     Partitioner: `random` - random distribution, `consistent` - CRC32 hash of key
//     (Empty and NULL keys are mapped to single partition), `consistent_random` - CRC32
//     hash of key (Empty and NULL keys are randomly partitioned), `murmur2` - Java
//     Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition),
//     `murmur2_random` - Java Producer compatible Murmur2 hash of key (NULL keys are
//     randomly partitioned. This is functionally equivalent to the default partitioner
//     in the Java Producer.). default: consistent_random importance: high

我遇到了同样的问题。 似乎在启动客户端时,第一条消息将始终转到同一分区。 Partioner.Random 将工作,如果您对所有消息使用相同的客户端

最新更新