当Kafka Consumer订阅活动时,会显示Timed out 1 in flight



这个脚本是从Kafka订阅事件的方法。

using Confluent.Kafka;
using Confluent.Kafka.Serialization;
static void Main(string[] args)
{
string brokerList = "broker";
var topics = new List<string>() { "topicName" };
var config = new Dictionary<string, object>
{
{ "group.id", "ConsumerGroup" },
{ "bootstrap.servers", brokerList },
{ "auto.offset.reset", "earliest" },
{ "enable.auto.commit", false }
};
using (var consumer = new Consumer<string, string>(config, new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8)))
{
consumer.OnMessage += (obj, msg) =>
{
...
};
consumer.Subscribe(topics);
while (true)
{
consumer.Poll(TimeSpan.FromMilliseconds(1000));
}
}
}

当我在调试模式下跟踪代码时,订阅事件的顺序是:

  1. consumer.Subscribe(topics)
  2. consumer.Poll(TimeSpan.FromMilliseconds(1000));
  3. consumer.OnMessage += (obj, msg) =>

在获取新事件(转到consumer.OnMessage(之前,它花了一点时间轮询(在consumer.Poll中(并在控制台窗口上打印一些信息。

如下:

4|2018-12-12 10:41:53.381|rdkafka#consumer-1|REQTMOUT| [thrd:broker/bootstrap]: broker/bootstrap: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests

在我最初的想法中,它使用consumer.Subscribe(topics)来连接broker,并使用consumer.Poll来消费新事件。

consumer.Poll似乎包括连接到broker和消费新事件。

我的问题是:

  1. 哪个函数可以连接到broker?consumer.Subscribeconsumer.Poll
  2. 为什么consumer.Poll会在控制台窗口打印信息?而且似乎有一些错误(飞行中超时1(
关于你的第一个问题。

哪个函数可以连接到broker?消费者订阅或消费者。民意调查还是?

consumer.Subscribe连接到broker,consumer.Poll使用消息。

关于第二个。

为什么是消费者。轮询打印控制台窗口上的信息?而且似乎有一些错误(飞行中超时1(。

Confluent.Kafka在这里发布了一个新的主要版本。如果您对以前的版本有任何问题,可以使用推荐的版本。基于GitHub存储库:

它比0.11.x版本的有更多的功能,得到了显著的改进,性能更高

相关内容

最新更新