为什么 .Net Core 控制台中的 ProducerBuilder 类的消息没有被 Kafka consumer shell 使用?



我在Ubuntu上安装了Kafka,并设法用shell文件测试了一个简单的场景:

  • 使用默认配置启动zookeeper
  • 使用默认配置启动节点或代理
  • 创建了一个主题,为其命名,使用单个分区和1的复制因子,并将其与zookeeper默认地址关联
  • 打开了生产者和消费者:

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mytopicname

然后是:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopicname --from-beginning

一切都好,一切都好。现在,我想关闭生产者,并在asp.net核心应用程序中为生产者编写代码。

using System;
using System.ComponentModel;
using System.Net;
using Confluent.Kafka;
namespace KafkaTraining
{
class Program
{
static void Main(string[] args)
{
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092"
};

using (var producer = new ProducerBuilder<string, string>(config).Build())
{
producer.Produce("mytopicname", new Message<string, string> { Value = "a log message" });
}
Console.ReadLine();
}
}
}

灵感来源于此://https://docs.confluent.io/current/clients/dotnet.html#

因此,上面的C#代码应该相当于shell命令,对吧,加上我在C#中写的应该由使用者使用的消息(由于shell命令,使用者仍然在终端窗口中打开(。

如果我之前发布的shell命令有效,即bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mytopicname,只有与localhost:9092(服务地址(和主题名称相关的附加信息,为什么C#程序不能成功地替换它,它应该会产生消息";一个日志消息";并且终端应该消费它,但是不消费。我该如何调试它?

PS。我已经在Linux Ubuntu上安装了Kafka,地址如下:https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.12-2.6.0.tgz而在我的Asp.Net Core 3.1控制台应用程序中,我安装了1.5.0版本,你可以看到,我不确定这是否起到了作用,但不知道如何开始调试。。。thansk查找任何指针。

<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<StartupObject>KafkaDemo.Program</StartupObject>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.5.0" />
</ItemGroup>
</Project>

您的消息没有被生成,因为您在消息传递之前就处理了生产者。Produce方法异步发送消息,并且不等待响应——它会立即返回。为了确保它被发送,您可以使用Flush(),它会阻塞直到所有飞行中的消息都被发送,或者您可以使用await ProduceAsync()

试试这个:

using (var producer = new ProducerBuilder<string, string>(config).Build())
{
producer.Produce("mytopicname", new Message<string, string> { Value = "a log message" });
producer.Flush();
}

最新更新