FAUST异步kafka消息处理并发性不工作



目前,我正在尝试从kafka主题读取数据,并与从kafk主题获取的数据异步调用rest-API。如果消息是Meher,则rest api立即给出响应,否则响应将花费5秒

卡夫卡数据

Waldo
Meher
Waldo
Meher
Waldo
Meher
Waldo
Meher
Waldo
Meher

下面是代码:

app = faust.App(
'faustApp',
broker="kafka://localhost:9092",
value_serializer='raw',
)
app_topic = app.topic('topic_base')
@app.agent(app_topic,concurrency=1)
async def imports_news(articles):
async for article in articles:
val = article.decode('utf-8')
url = 'http://0.0.0.0:5050/' + val
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
print(data)
if __name__ == '__main__':
app.main()

电流输出:

Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!

预期输出:

Hello Meher!
Hello Meher!
Hello Meher!
Hello Meher!
Hello Meher!
Hello Waldo!
Hello Waldo!
Hello Waldo!
Hello Waldo!
Hello Waldo!

预期的是先获得即时响应的所有其余调用的响应,之后应该是延迟响应,但目前它是按顺序工作的。

如果我将并发性增加到5,它将给出预期的输出,但在并发性为1的情况下应该使用相同的输出。不确定,如果我错过了什么。。。。有什么帮助吗?

更新1:

我已经用普通的python asyncIO尝试过同样的方法。。它正在按预期工作

import asyncio
import aiofiles
import aiohttp
async def get_player(player_name):
url = 'http://0.0.0.0:5050/' + player_name
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.text()
print(data)

loop = asyncio.get_event_loop()
player_args = ["Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo",
"Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo",
"Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo"]
loop.run_until_complete(
asyncio.gather(
*(get_player(args) for args in player_args)
)
)

来自faust文档https://faust.readthedocs.io/en/latest/userguide/agents.html#id5似乎每个代理实例一次处理流的一个元素。流迭代不会在其可用元素上并行化,但单个代理实例将按顺序逐个处理流元素。

如果在处理流的某个元素时等待,则代理实例将不会移动到下一个元素(如果可用(,直到该元素的处理完成。等待操作不会";解锁";代理,将其移动到下一个流元素,然后在第一次等待完成后恢复对第一个元素的处理。

另一方面,如果您将concurrency设置为5,则有5个实例可以从流中提取项目,并同时并行处理它们。

Asyncio.gather之所以工作,是因为协程被封装到任务中,并同时运行,等待它们的结果。

最新更新