当以编程方式构建代理时,无法使用webview向faust代理/主题发送消息



我正试图以编程方式构建代理,并一直遵循此示例。当我看到在脚本开头生成的faust -A <> agents时,我可以看到主题和代理。例如,代理名称类似于{topic}_agent。所有这些代理分别访问一些rest api。代码如下:

def create_agent(next_topic, model_metadata: Dict):
""" 
creation of a single agent.
`start_topic`:  str
Just a string that you can use in other functions
to figure out how messages in that topic can be
transformed
`next_topic`:  faust.topics.Topic
A faust `app.topic` instance
"""
async def agent(stream):
"""Call domino model and return response
"""
async for message in stream:
domino_req = {"data": message.asdict()}
app.logger.info(domino_req)
response = requests.post(
model_metadata['url'],
auth=(
model_metadata['auth'],
model_metadata['auth'],
),
json=domino_req,
)
if response.status_code == 200:
answer = response.json()
await next_topic.send(answer['result'])
else:
app.logger.error(response.reason)
# app.logger.info(f"NEW Agent Created: ## Agent - {consumer} ##")
return agent
def agents(registry_response):
"""
configuration of multiple agents
"""
agents = []
for key, value in registry_response.items():
""" `topic`:  app.topic instance """
agents.append(
# `topic.start`: str
# `topic.next`: faust.topics.Topic
(create_agent(next_topic= all_responses, model_metadata = value),key)
)
return agents
def attach_agent(agent, topic):
""" Attach the agent to the Faust app """
# `topic.faust`: faust.topics.Topic
# it is equivalent to `app.topic(topic.start)`
print("hello")
app.agent(channel=app.topic(topic), name=f"{topic}_agent")(agent)
# new_agent.start()
# app.logger.info("hello")
# app.logger.info(new_agent)
# app.logger.info(new_agent.info())
# app.logger.info(app.agents)
# @app.task
# async def get_model_registry():
#     """
#     Create topics and agents for the initial set of models present in the registry
#     """
app.logger.info('APP STARTED')
app.logger.info('Fetching Models from Model Registry')
#TODO: Call the Model Registry and process it
#Just mocking the registry 
registry_response = initial_model_registry_metadata
app.logger.info(f'Number of Models Found {len(registry_response)}')
for agent,topic in agents(registry_response):
attach_agent(agent, topic)
@app.page('/model/{key_ai_model}')
class frontdoor(View):
async def get(self, request: Request) -> Response:
return self.json({'key_ai_model': key_ai_model})
async def post(self, request: Request, key_ai_model: str) -> Response:
request_id = str(uuid.uuid4())
src = await request.json()
msg = GdeltRequest(**src)
app.logger.info(msg)
await gdelt_agent.cast(msg, key=request_id)
return self.json({'request_id': request_id, 'key_ai_model': key_ai_model})
@app.agent(all_responses)
async def print_responses(stream):
async for message in stream:
print(message)

然而,当我发布的东西,我得到一个错误-NameError: name 'gdelt_agent' is not defined。如能提供帮助,我将不胜感激。

我已经能够通过将代理分配为字典中的值并稍后使用它将消息强制传递给代理来弄清楚这一点。

agents_dict = {}
topics_dict ={}
def attach_agent(agent, topic):
""" Attach the agent to the Faust app """
# `topic.faust`: faust.topics.Topic
# it is equivalent to `app.topic(topic.start)`
print("hello")
topics_dict[topic] = app.topic(topic)
agents_dict[topic] = app.agent(channel=topics_dict[topic], name=f"{topic}_agent")(agent)
agents_dict[topic].start()

谢谢大家。

最新更新