我是python的新手,所以这可能是一个简单的问题,但我有一个kafka消费者,我从中读取消息。每次有新消息进来时,它都会将之前的消息改写成顺序。json文件,但我想附加它。此外,我希望使用某种可能的暂停确保读取消息的速度不超过每1秒。任何关于如何做到这一点的建议将是非常感激的。这是我当前的代码
for message in consumer:
with open('order.json', 'w') as file:
file.write(message.value.decode('UTF-8'))
以append
模式打开文件,因为' w'
(写模式)每次存在时都会截断文件
for message in consumer:
with open('order.json', 'a') as file:
file.write(message.value.decode('UTF-8'))
您希望以追加模式打开文件。此外,您可能不希望在每个消息上打开文件,因为它可能是一个昂贵的操作(例如,您将需要更改文件元数据,如每次关闭文件时修改时间):
# open file in append mode, once
with open('order.json', 'a') as file:
for message in consumer:
file.write(message.value.decode('UTF-8'))
关于速率限制,你可以从一些简单的东西开始,像下面这样:
import time
def ratelimit(it, sleep=1.0):
for v in it:
yield v
time.sleep(sleep)
if __name__ == '__main__':
for i in ratelimit(range(10)):
print(i)
这将确保来自迭代器的连续值之间至少有 1秒的延迟。下面是一个演示速率限制器的演示。
在循环外打开:
with open('order.json', 'w') as file:
for message in consumer:
file.write(message.value.decode('UTF-8'))
或者打开外部并使用a
,如果你要为每次运行附加:
with open('order.json', 'a') as file:
for message in consumer:
file.write(message.value.decode('UTF-8'))