如何在 confluent kafka python 中读取批处理消息?



我正在尝试读取来自Kafka的消息,所以我编写了简单的消费者来读取来自Kafka的消息。

While True:
message = consumer.poll(timeout=1.0)
# i am doing something with messages

在上面的代码中,消息类型的输出是消息对象。 如何获取消息数组?

有没有可能??

注意:仅基本消费者配置不多。

librdkafka(底层 C 库)只将消息一个接一个地返回给应用程序,但在内部,消息是从代理批量获取的,因此没有性能下降。消息在内部缓冲区中排队,等待应用轮询。

有一些配置可以调整行为:

fetch.wait.max.ms(默认值 100),提供给代理累积要发送的数据的时间fetch.message.max.bytes(默认1048576,1GB),批处理的最大大小queued.max.messages.kbytes(默认值 1000000),内部队列中数据的最大大小。如果不定期轮询,则不会从队列中清除数据,并且将无法获取更多数据。

您可以在这里找到许多其他内容:https://github.com/edenhill/librdkafka/blob/0.11.0.x/CONFIGURATION.md


如果你真的因为处理数据的方式而想要一个数据数组,你可以做的是像你一样在循环中以低超时调用轮询,并在你有 x 条消息或 y 毫秒后停止循环,将它们累积到集合中。处理生成的数组并重复循环。

生成也是如此:您逐个生成数据,但消息在发送到代理之前被批处理。

我认为你可以使用consumer.consumption(batch_size,timeout=1)

batch_size = 10
while True:

# Poll for new messages
messages = consumer.consume(batch_size, timeout=1)
if messages is None:
continue
if not messages:
# No messages received
continue
for message in messages:
if message.error():
print(f"Error consuming message: {message.error()}")
else:
print(f"Received message: {message.value()}")

最新更新