这个脚本是从Kafka订阅事件的方法。
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
static void Main(string[] args)
{
string brokerList = "broker";
var topics = new List<string>() { "topicName" };
var config = new Dictionary<string, object>
{
{ "group.id", "ConsumerGroup" },
{ "bootstrap.servers", brokerList },
{ "auto.offset.reset", "earliest" },
{ "enable.auto.commit", false }
};
using (var consumer = new Consumer<string, string>(config, new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8)))
{
consumer.OnMessage += (obj, msg) =>
{
...
};
consumer.Subscribe(topics);
while (true)
{
consumer.Poll(TimeSpan.FromMilliseconds(1000));
}
}
}
当我在调试模式下跟踪代码时,订阅事件的顺序是:
consumer.Subscribe(topics)
consumer.Poll(TimeSpan.FromMilliseconds(1000));
consumer.OnMessage += (obj, msg) =>
在获取新事件(转到consumer.OnMessage
(之前,它花了一点时间轮询(在consumer.Poll
中(并在控制台窗口上打印一些信息。
如下:
4|2018-12-12 10:41:53.381|rdkafka#consumer-1|REQTMOUT| [thrd:broker/bootstrap]: broker/bootstrap: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
在我最初的想法中,它使用consumer.Subscribe(topics)
来连接broker,并使用consumer.Poll
来消费新事件。
但consumer.Poll
似乎包括连接到broker和消费新事件。
我的问题是:
- 哪个函数可以连接到broker?
consumer.Subscribe
或consumer.Poll
或 - 为什么
consumer.Poll
会在控制台窗口打印信息?而且似乎有一些错误(飞行中超时1(
哪个函数可以连接到broker?消费者订阅或消费者。民意调查还是?
consumer.Subscribe
连接到broker,consumer.Poll
使用消息。
关于第二个。
为什么是消费者。轮询打印控制台窗口上的信息?而且似乎有一些错误(飞行中超时1(。
Confluent.Kafka在这里发布了一个新的主要版本。如果您对以前的版本有任何问题,可以使用推荐的版本。基于GitHub存储库:
它比0.11.x版本的有更多的功能,得到了显著的改进,性能更高