对于事件驱动的Kafka消费者,是否有Python API



我一直在尝试构建一个将Kafka作为唯一接口的Flask应用程序。出于这个原因,我想要一个Kafka消费者,当相关主题的流中有新消息时,它会被触发,并通过将消息推回到Kafka流来做出响应。

我一直在寻找类似Spring实现的东西:

@KafkaListener(topics = "mytopic", groupId = "mygroup")
public void listen(String message) {
System.out.println("Received Messasge in group mygroup: " + message);
}

我看过:

  1. kafka python
  2. 皮卡夫卡
  3. 合流卡夫卡

但我在Python中找不到任何与事件驱动的实现风格相关的东西。

以下是@MickaelMaison的答案给出的想法的实现。我用的是卡夫卡蟒蛇。

from kafka import KafkaConsumer
import threading
BOOTSTRAP_SERVERS = ['localhost:9092']
def register_kafka_listener(topic, listener):
# Poll kafka
def poll():
# Initialize consumer Instance
consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTRAP_SERVERS)
print("About to start polling for topic:", topic)
consumer.poll(timeout_ms=6000)
print("Started Polling for topic:", topic)
for msg in consumer:
print("Entered the loopnKey: ",msg.key," Value:", msg.value)
kafka_listener(msg)
print("About to register listener to topic:", topic)
t1 = threading.Thread(target=poll)
t1.start()
print("started a background thread")
def kafka_listener(data):
print("Image Ratings:n", data.value.decode("utf-8"))
register_kafka_listener('topic1', kafka_listener)

轮询是在另一个线程中完成的。一旦接收到消息,就会通过传递从Kafka检索到的数据来调用监听器。

Kafka Consumer必须连续轮询才能从代理中检索数据。

Spring为您提供了这种新奇的API,但在底层,它在循环中调用poll,并且只在检索记录时调用您的方法。

您可以使用您提到的任何Python客户端轻松地构建类似的东西。与Java一样,这不是(大多数(Kafka客户端直接公开的API,而是由上层提供的东西。这是您需要构建的东西。

最新更新