我在 Kafka 中创建了一个包含 9 个分区的主题,将其恰当地命名为"test",并使用客户端库在 C# (.NET Core) 中将两个简单的应用程序组合在一起Confluent.Kafka
客户端库:生产者和消费者。我只做了调整文档中的示例。
我正在运行消费者应用程序的两个实例和一个生产者实例。我认为在这里粘贴消费者代码没有多大意义,这是一个微不足道的"获取消息,在屏幕上打印"应用程序,但是,它也确实打印了消息来自的分区号。
这是制作者应用程序:
static async Task Main(string[] args)
{
var random = new Random();
var config = new ProducerConfig {
BootstrapServers = "10.0.0.5:9092",
Partitioner = Partitioner.ConsistentRandom
};
int counter = 0;
while (true)
{
using (var p = new ProducerBuilder<string, string>(config).Build())
{
try
{
p.BeginProduce(
"test",
new Message<string, string>
{
//Key = random.Next().ToString(),
Value = $"test {++counter}"
});
if (counter % 10 == 0)
p.Flush();
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
}
}
}
问题:如果未设置消息的Key
属性,则所有消息都将发送到分区号 7,这意味着我的一个使用者实例处于空闲状态。我不得不手动随机化密钥才能在分区之间分配它们(请参阅注释掉的行)。(从文档中复制的原始代码使用Null
作为密钥的类型,这也将所有消息发送到第 7 个分区。
为什么?根据ProducerConfig.Partitioner
属性的文档,如果未指定键,consistent_random
选项应确保随机分布。我尝试使用Partioner.Random
选项,无论键如何,它都应该使用随机分布,但这没有帮助。
这是预期的行为,是我做错了什么,还是遇到了错误?
我使用的是Confluent.Kafka NuGet的1.0.0-RC2版本。
分区程序配置的完整文档:
// Summary:
// Partitioner: `random` - random distribution, `consistent` - CRC32 hash of key
// (Empty and NULL keys are mapped to single partition), `consistent_random` - CRC32
// hash of key (Empty and NULL keys are randomly partitioned), `murmur2` - Java
// Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition),
// `murmur2_random` - Java Producer compatible Murmur2 hash of key (NULL keys are
// randomly partitioned. This is functionally equivalent to the default partitioner
// in the Java Producer.). default: consistent_random importance: high
我遇到了同样的问题。 似乎在启动客户端时,第一条消息将始终转到同一分区。 Partioner.Random 将工作,如果您对所有消息使用相同的客户端