调试浮士德流处理 - 从主题开头重新启动应用程序



我正在调试一个简单的应用程序:

import faust
app = faust.App('app08')
# want to start from the beginning of the 
# topic every time the application restarts
@app.agent(topic) 
async def process(stream):
async for event in stream:
print(event)

并希望在重新启动此应用程序时,代理可以从最早的偏移量读取。现在,它很聪明,知道最后一条消息读取的位置,并在重新启动时从该位置开始。尽管搜索了一段时间的文档,但我找不到如何做到这一点的示例。我知道该怎么做的唯一方法是更改应用程序名称,例如:app08app09

请记住,偏移量由 Kafka 服务器使用与您的浮士德应用程序同名的消费者组控制,我一直在使用kafaka-consumer-groupsCLI(您的 kafka 安装的一部分(来执行此操作。

kafka-consumer-groups --bootstrap-server kafka_bootstrap --reset-offsets --to-earliest --group faust_appname --execute --all-topics

您还可以将--to-earliest替换为--to-datetime,并以2020-09-20T00:00:00.00格式提供时间戳(如果您正在运行相对较新版本的 Kafka(。

如果你想自动化这个,我也有Python API来自动控制消费者组。

最新更新