参与一个挑战,它说:你的第一步-使用Apache Kafka中的数据样本所以他们给了我主题名、API_KEY和API _SECRET。哦,还有引导服务器。然后他们声称,如果你不熟悉卡夫卡,Confluent提供了全面的文档。所以好吧,登录到汇流,形成一个集群。。使用数据的下一步是什么?
下面是一个将来自Kafka的消息放入Python列表的基本模式。
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'someTopicName',
bootstrap_servers=['192.168.1.160:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')))
print("We have a consumer instantiated")
print(consumer)
messageCache = []
for message in consumer:
messageCache.append(message.value)
在这种情况下,我的Kafka代理在我的私人局域网上,使用默认端口,所以我的引导服务器列表只是["192.168.1.160:9092"]
您可以使用标准计数器和if语句将列表保存到文件或其他文件中,因为Kafka流被假定为永远运行。例如,我有一个进程,它使用Kafka消息,并将它们保存为镶木地板中的数据帧,以每1000000条消息保存一次HDFS。在这种情况下,我想保存历史消息来开发ML模型。卡夫卡的伟大之处在于,我可以编写另一个过程,实时评估并潜在地响应每一条消息。