两个代理在一个 kafka 主题上具有不同的过滤器.在浮士德流中的致谢



我想让两个浮士德代理侦听同一个 kafka 主题,但每个代理在处理事件之前都使用自己的过滤器,并且它们的事件集不相交。

在文档中,我们有一个示例: https://faust.readthedocs.io/en/latest/userguide/streams.html#id4

如果两个代理使用订阅同一主题的流:

topic = app.topic('orders')
@app.agent(topic)
async def processA(stream):
async for value in stream:
print(f'A: {value}')
@app.agent(topic)
async def processB(stream):
async for value in stream:
print(f'B: {value}')

售票员将转发在"订单"上收到的每条消息 主题,每当 它进入代理流。

当确认事件时,引用计数会减少,当 当达到零时,消费者将认为该偏移量为"完成",并且 可以提交它。

以下是过滤器 https://faust.readthedocs.io/en/latest/userguide/streams.html#id13:

@app.agent() async def process(stream):
async for value in stream.filter(lambda: v > 1000).group_by(...):
...

我使用了一些复杂的过滤器,但结果是将流分成两部分,用于具有完全不同的逻辑的两个代理。(我不用group_by(

如果两个代理一起工作,一切正常。但是,如果我停止它们并重新启动每个将从头开始处理流。因为每个事件都没有被其中一个代理确认。 如果我确认每个代理中的所有事件,如果其中一个代理不会启动,第二个代理将清理主题。(如果一个被压碎并重新启动,则导体将看到三个订阅者,因为它正在等待 20 分钟的压碎代理响应(。

我只想将事件分为两部分。在这种情况下,如何进行适当的同步?

faust

过滤在确认过滤掉的事件时有一些错误。我建议在从流中使用时不使用fault.filter()功能,而是使用简单的if...then...else语句样式,类似于以下内容:

@app.agent(topic)
async def process(stream):
async for event in stream:
if event.amount >= 300.0:
yield event

最新更新